Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'eventmachine'
- class EventMachine::WorkQueue
- def initialize(concurrency=1)
- @concurrency = concurrency
- @closed = false
- @workers = 0
- @endjob_done = false
- @wqueue = [] # worker queue
- end
- def push(&blk)
- raise "Closed already" if closed?
- @wqueue << blk
- EM.next_tick {
- dojob
- }
- end
- alias :<< :push
- alias :enq :push
- # close the queue and do not accept new worker.
- # the given block will be called after all worker is finished.
- def close(&blk)
- raise "already closed this work queue" if @closed
- @closed = true
- @on_close_hook = blk if blk
- if @workers == 0 && @wqueue.empty?
- endjob
- end
- end
- def closed?
- @closed
- end
- private
- def dojob
- return if @workers >= @concurrency || @wqueue.empty?
- @workers += 1
- w = @wqueue.shift
- on_done = proc { |*args|
- #raise RuntimeError, 'already completed this iteration' if is_done
- @workers -= 1
- if @closed && @wqueue.empty?
- endjob
- else
- EM.next_tick {dojob}
- end
- }
- class << on_done
- alias :next :call
- alias :return :call
- end
- case w.arity
- when 0
- w.call
- on_done.call
- when 1
- w.call(on_done)
- end
- end
- def endjob
- return if @endjob_done
- begin
- @on_close_hook.call if @on_close_hook
- ensure
- @endjob_done = true
- end
- end
- end
Add Comment
Please, Sign In to add comment