Guest User

Untitled

a guest
Jul 23rd, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.37 KB | None | 0 0
  1. require 'eventmachine'
  2.  
  3. class EventMachine::WorkQueue
  4.  
  5. def initialize(concurrency=1)
  6. @concurrency = concurrency
  7. @closed = false
  8. @workers = 0
  9. @endjob_done = false
  10. @wqueue = [] # worker queue
  11. end
  12.  
  13. def push(&blk)
  14. raise "Closed already" if closed?
  15. @wqueue << blk
  16.  
  17. EM.next_tick {
  18. dojob
  19. }
  20. end
  21.  
  22. alias :<< :push
  23. alias :enq :push
  24.  
  25. # close the queue and do not accept new worker.
  26. # the given block will be called after all worker is finished.
  27. def close(&blk)
  28. raise "already closed this work queue" if @closed
  29. @closed = true
  30. @on_close_hook = blk if blk
  31. if @workers == 0 && @wqueue.empty?
  32. endjob
  33. end
  34. end
  35.  
  36. def closed?
  37. @closed
  38. end
  39.  
  40. private
  41. def dojob
  42. return if @workers >= @concurrency || @wqueue.empty?
  43. @workers += 1
  44.  
  45. w = @wqueue.shift
  46.  
  47. on_done = proc { |*args|
  48. #raise RuntimeError, 'already completed this iteration' if is_done
  49. @workers -= 1
  50.  
  51. if @closed && @wqueue.empty?
  52. endjob
  53. else
  54. EM.next_tick {dojob}
  55. end
  56. }
  57. class << on_done
  58. alias :next :call
  59. alias :return :call
  60. end
  61.  
  62. case w.arity
  63. when 0
  64. w.call
  65. on_done.call
  66. when 1
  67.  
  68. w.call(on_done)
  69. end
  70. end
  71.  
  72. def endjob
  73. return if @endjob_done
  74. begin
  75. @on_close_hook.call if @on_close_hook
  76. ensure
  77. @endjob_done = true
  78. end
  79. end
  80.  
  81. end
Add Comment
Please, Sign In to add comment