Guest User

Untitled

a guest
Jul 17th, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.18 KB | None | 0 0
  1. module EventMachine
  2. # A simple iterator for concurrent asynchronous work.
  3. #
  4. # Unlike ruby's built-in iterators, the end of the current iteration cycle is signaled manually,
  5. # instead of happening automatically after the yielded block finishes executing. For example:
  6. #
  7. # (0..10).each{ |num| }
  8. #
  9. # becomes:
  10. #
  11. # EM::Iterator.new(0..10).each{ |num,iter| iter.next }
  12. #
  13. # This is especially useful when doing asynchronous work via reactor libraries and
  14. # functions. For example, given a sync and async http api:
  15. #
  16. # response = sync_http_get(url); ...
  17. # async_http_get(url){ |response| ... }
  18. #
  19. # a synchronous iterator such as:
  20. #
  21. # responses = urls.map{ |url| sync_http_get(url) }
  22. # ...
  23. # puts 'all done!'
  24. #
  25. # could be written as:
  26. #
  27. # EM::Iterator.new(urls).map(proc{ |url,iter|
  28. # async_http_get(url){ |res|
  29. # iter.return(res)
  30. # }
  31. # }, proc{ |responses|
  32. # ...
  33. # puts 'all done!'
  34. # })
  35. #
  36. # Now, you can take advantage of the asynchronous api to issue requests in parallel. For example,
  37. # to fetch 10 urls at a time, simply pass in a concurrency of 10:
  38. #
  39. # EM::Iterator.new(urls, 10).each do |url,iter|
  40. # async_http_get(url){ iter.next }
  41. # end
  42. #
  43. class Iterator
  44. # Create a new parallel async iterator with specified concurrency.
  45. #
  46. # i = EM::Iterator.new(1..100, 10)
  47. #
  48. # will create an iterator over the range that processes 10 items at a time. Iteration
  49. # is started via #each, #map or #inject
  50. #
  51. def initialize(list, concurrency = 1)
  52. raise ArgumentError, 'argument must be enumerable' unless list.respond_to?(:to_a)
  53. @list = list.to_a.dup
  54. @concurrency = concurrency
  55.  
  56. @started = false
  57. @ended = false
  58. end
  59.  
  60. # Change the concurrency of this iterator. Workers will automatically be spawned or destroyed
  61. # to accomodate the new concurrency level.
  62. #
  63. def concurrency=(val)
  64. old = @concurrency
  65. @concurrency = val
  66.  
  67. spawn_workers if val > old and @started and !@ended
  68. end
  69. attr_reader :concurrency
  70.  
  71. # Iterate over a set of items using the specified block or proc.
  72. #
  73. # EM::Iterator.new(1..100).each do |num, iter|
  74. # puts num
  75. # iter.next
  76. # end
  77. #
  78. # An optional second proc is invoked after the iteration is complete.
  79. #
  80. # EM::Iterator.new(1..100).each(
  81. # proc{ |num,iter| iter.next },
  82. # proc{ puts 'all done' }
  83. # )
  84. #
  85. def each(foreach=nil, after=nil, &blk)
  86. raise ArgumentError, 'proc or block required for iteration' unless foreach ||= blk
  87. raise RuntimeError, 'cannot iterate over an iterator more than once' if @started or @ended
  88.  
  89. @started = true
  90. @pending = 0
  91. @workers = 0
  92.  
  93. all_done = proc{
  94. after.call if after and @ended and @pending == 0
  95. }
  96.  
  97. @process_next = proc{
  98. unless @ended or @workers > @concurrency
  99. if @list.empty?
  100. @ended = true
  101. @workers -= 1
  102. all_done.call
  103. else
  104. item = @list.shift
  105. @pending += 1
  106.  
  107. is_done = false
  108. on_done = proc{
  109. raise RuntimeError, 'already completed this iteration' if is_done
  110. is_done = true
  111.  
  112. @pending -= 1
  113.  
  114. if @ended
  115. all_done.call
  116. else
  117. EM.next_tick(@process_next)
  118. end
  119. }
  120. class << on_done
  121. alias :next :call
  122. end
  123.  
  124. foreach.call(item, on_done)
  125. end
  126. else
  127. @workers -= 1
  128. end
  129. }
  130.  
  131. spawn_workers
  132.  
  133. self
  134. end
  135.  
  136. # Collect the results of an asynchronous iteration into an array.
  137. #
  138. # EM::Iterator.new(%w[ pwd uptime uname date ], 2).map(proc{ |cmd,iter|
  139. # EM.system(cmd){ |output,status|
  140. # iter.return(output)
  141. # }
  142. # }, proc{ |results|
  143. # p results
  144. # })
  145. #
  146. def map(foreach, after)
  147. index = 0
  148.  
  149. inject([], proc{ |results,item,iter|
  150. i = index
  151. index += 1
  152.  
  153. is_done = false
  154. on_done = proc{ |res|
  155. raise RuntimeError, 'already returned a value for this iteration' if is_done
  156. is_done = true
  157.  
  158. results[i] = res
  159. iter.return(results)
  160. }
  161. class << on_done
  162. alias :return :call
  163. def next
  164. raise NoMethodError, 'must call #return on a map iterator'
  165. end
  166. end
  167.  
  168. foreach.call(item, on_done)
  169. }, proc{ |results|
  170. after.call(results)
  171. })
  172. end
  173.  
  174. # Inject the results of an asynchronous iteration onto a given object.
  175. #
  176. # EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter|
  177. # EM.system(cmd){ |output,status|
  178. # hash[cmd] = status.exitstatus == 0 ? output.strip : nil
  179. # iter.return(hash)
  180. # }
  181. # }, proc{ |results|
  182. # p results
  183. # })
  184. #
  185. def inject(obj, foreach, after)
  186. each(proc{ |item,iter|
  187. is_done = false
  188. on_done = proc{ |res|
  189. raise RuntimeError, 'already returned a value for this iteration' if is_done
  190. is_done = true
  191.  
  192. obj = res
  193. iter.next
  194. }
  195. class << on_done
  196. alias :return :call
  197. def next
  198. raise NoMethodError, 'must call #return on an inject iterator'
  199. end
  200. end
  201.  
  202. foreach.call(obj, item, on_done)
  203. }, proc{
  204. after.call(obj)
  205. })
  206. end
  207.  
  208. private
  209.  
  210. # Spawn workers to consume items from the iterator's enumerator based on the current concurrency level.
  211. #
  212. def spawn_workers
  213. EM.next_tick(start_worker = proc{
  214. if @workers < @concurrency
  215. @workers += 1
  216. @process_next.call
  217. EM.next_tick(start_worker)
  218. end
  219. })
  220. nil
  221. end
  222. end
  223. end
  224.  
  225. if __FILE__ == $0
  226. $:.unshift File.join(File.dirname(__FILE__), '..')
  227. require 'eventmachine'
  228.  
  229. # TODO: real tests
  230. # TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next }
  231. # TODO: support iter.pause/resume/stop/break/continue?
  232. # TODO: create some exceptions instead of using RuntimeError
  233. # TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop })
  234.  
  235. EM.run{
  236. EM::Iterator.new(1..50).each{ |num,iter| p num; iter.next }
  237.  
  238. i = EM::Iterator.new(1..100, 5)
  239. i.each(proc{|num,iter|
  240. p num.to_s
  241. iter.next
  242. }, proc{
  243. p :done
  244. })
  245. EM.add_timer(0.03){
  246. i.concurrency = 1
  247. }
  248. EM.add_timer(0.04){
  249. i.concurrency = 3
  250. }
  251.  
  252. EM::Iterator.new(100..150).map(proc{ |num,iter|
  253. EM.add_timer(0.01){ iter.return(num) }
  254. }, proc{ |results|
  255. p results
  256. })
  257.  
  258. EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter|
  259. EM.system(cmd){ |output,status|
  260. hash[cmd] = status.exitstatus == 0 ? output.strip : nil
  261. iter.return(hash)
  262. }
  263. }, proc{ |results|
  264. p results
  265. })
  266. }
  267. end
Add Comment
Please, Sign In to add comment