Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'rubygems'
- require 'eventmachine'
- # number of iterations
- n = 8
- work_array = []
- n.times { work_array << rand(n) + 1 }
- puts "Work array: each operation will sleep the specified number of seconds."
- p work_array
- puts "\n\nillustrates normal default behavior."
- puts "results are ordered by length of time to complete work."
- EM.run do
- n.times do |index|
- operation = proc { sleep work_array[index]; [work_array[index], index] }
- cback = proc { |result|
- work_time, index = result.first, result.last
- puts "work time [#{work_time}] index [#{index}]"
- }
- EM.defer operation, cback
- end
- EM.add_timer(work_array.max + 1) { EM.stop }
- end
- module EventMachine
- def self.run blk=nil, tail=nil, &block
- @deferred_task_id, @deferred_task_unmatched_id = 0, 0
- @tails ||= []
- tail and @tails.unshift(tail)
- if reactor_running?
- (b = blk || block) and b.call # next_tick(b)
- else
- @conns = {}
- @acceptors = {}
- @timers = {}
- @wrapped_exception = nil
- begin
- @reactor_running = true
- initialize_event_machine
- (b = blk || block) and add_timer(0, b)
- if @next_tick_queue && !@next_tick_queue.empty?
- add_timer(0) { signal_loopbreak }
- end
- @reactor_thread = Thread.current
- run_machine
- ensure
- begin
- release_machine
- ensure
- if @threadpool
- @threadpool.each { |t| t.exit }
- @threadpool.each { |t| t.kill! if t.alive? }
- @threadqueue = nil
- @resultqueue = nil
- end
- @threadpool = nil
- @next_tick_queue = nil
- end
- @reactor_running = false
- @reactor_thread = nil
- end
- until @tails.empty?
- @tails.pop.call
- end
- raise @wrapped_exception if @wrapped_exception
- end
- end
- def self.deferred_task_id
- @deferred_task_id
- end
- def self.increment_deferred_task_id
- @deferred_task_id += 1
- end
- def self.deferred_task_unmatched_id # :nodoc:
- @deferred_task_unmatched_id
- end
- def self.increment_deferred_task_unmatched_id # :nodoc:
- @deferred_task_unmatched_id += 1
- end
- def self.run_deferred_callbacks # :nodoc:
- tasks_completed_early = []
- until (@resultqueue ||= []).empty?
- result, cback, task_id = @resultqueue.pop
- unless task_id
- cback.call result if cback
- else
- if task_id == deferred_task_unmatched_id
- cback.call result, task_id if cback
- increment_deferred_task_unmatched_id
- tasks_completed_early.each { |task| @resultqueue.push task }
- tasks_completed_early.clear
- else
- tasks_completed_early << [result, cback, task_id]
- end
- end
- end
- # push unmatched completed tasks back onto the resultqueue for processing during next signal break
- tasks_completed_early.each { |task| @resultqueue << task }
- @next_tick_queue ||= []
- if (l = @next_tick_queue.length) > 0
- l.times {|i| @next_tick_queue[i].call}
- @next_tick_queue.slice!( 0...l )
- end
- =begin
- (@next_tick_queue ||= []).length.times {
- cback=@next_tick_queue.pop and cback.call
- }
- =end
- =begin
- if (@next_tick_queue ||= []) and @next_tick_queue.length > 0
- ary = @next_tick_queue.dup
- @next_tick_queue.clear
- until ary.empty?
- cback=ary.pop and cback.call
- end
- end
- =end
- end
- def self.defer op = nil, callback = nil, task_id = nil, &blk
- unless @threadpool
- require 'thread'
- @threadpool = []
- @threadqueue = ::Queue.new
- @resultqueue = ::Queue.new
- spawn_threadpool
- end
- @threadqueue << [op || blk, callback, task_id]
- end
- def self.spawn_threadpool
- until @threadpool.size == 20
- thread = Thread.new do
- while true
- op, cback, task_id = *@threadqueue.pop
- result = op.call
- @resultqueue << [result, cback, task_id]
- EventMachine.signal_loopbreak
- end
- end
- @threadpool << thread
- end
- end
- end
- puts "\n\nillustrates new deterministic/ordered #defer behavior."
- puts "results ordered by task id."
- EM.run do
- n.times do
- index = EM.deferred_task_id
- operation = proc {
- sleep work_array[index]
- work_array[index]
- }
- cback = proc { |result, index|
- work_time = result
- puts "work time [#{work_time}] index [#{index}]"
- }
- EM.defer operation, cback, EM.deferred_task_id
- EM.increment_deferred_task_id
- end
- EM.add_timer(work_array.max + 1) { EM.stop }
- end
- puts "\n\nillustrates a mixture of non-deterministic and deterministic tasks."
- puts "result's order is mixed by work time and task id."
- EM.run do
- n.times do |i|
- unless (i % 2) == 0
- index = EM.deferred_task_id
- operation = proc {
- sleep work_array[index]
- work_array[index]
- }
- cback = proc { |result, index|
- work_time = result
- puts "work time [#{work_time}] index [#{index}]"
- }
- EM.defer operation, cback, EM.deferred_task_id
- EM.increment_deferred_task_id
- else
- operation = proc {
- sleep work_array[i]
- work_array[i]
- }
- cback = proc { |result|
- work_time = result
- puts "work time [#{work_time}] index [none]"
- }
- EM.defer operation, cback
- end
- end
- EM.add_timer(work_array.max + 1) { EM.stop }
- end
Add Comment
Please, Sign In to add comment