Guest User

Untitled

a guest
Jun 24th, 2018
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.53 KB | None | 0 0
  1. require 'rubygems'
  2. require 'eventmachine'
  3.  
  4. # number of iterations
  5. n = 8
  6. work_array = []
  7. n.times { work_array << rand(n) + 1 }
  8.  
  9. puts "Work array: each operation will sleep the specified number of seconds."
  10. p work_array
  11.  
  12.  
  13. puts "\n\nillustrates normal default behavior."
  14. puts "results are ordered by length of time to complete work."
  15.  
  16. EM.run do
  17. n.times do |index|
  18. operation = proc { sleep work_array[index]; [work_array[index], index] }
  19. cback = proc { |result|
  20. work_time, index = result.first, result.last
  21. puts "work time [#{work_time}] index [#{index}]"
  22. }
  23.  
  24. EM.defer operation, cback
  25. end
  26.  
  27. EM.add_timer(work_array.max + 1) { EM.stop }
  28. end
  29.  
  30.  
  31. module EventMachine
  32. def self.run blk=nil, tail=nil, &block
  33. @deferred_task_id, @deferred_task_unmatched_id = 0, 0
  34. @tails ||= []
  35. tail and @tails.unshift(tail)
  36.  
  37. if reactor_running?
  38. (b = blk || block) and b.call # next_tick(b)
  39. else
  40. @conns = {}
  41. @acceptors = {}
  42. @timers = {}
  43. @wrapped_exception = nil
  44. begin
  45. @reactor_running = true
  46. initialize_event_machine
  47. (b = blk || block) and add_timer(0, b)
  48. if @next_tick_queue && !@next_tick_queue.empty?
  49. add_timer(0) { signal_loopbreak }
  50. end
  51. @reactor_thread = Thread.current
  52. run_machine
  53. ensure
  54. begin
  55. release_machine
  56. ensure
  57. if @threadpool
  58. @threadpool.each { |t| t.exit }
  59. @threadpool.each { |t| t.kill! if t.alive? }
  60. @threadqueue = nil
  61. @resultqueue = nil
  62. end
  63. @threadpool = nil
  64. @next_tick_queue = nil
  65. end
  66. @reactor_running = false
  67. @reactor_thread = nil
  68. end
  69.  
  70. until @tails.empty?
  71. @tails.pop.call
  72. end
  73.  
  74. raise @wrapped_exception if @wrapped_exception
  75. end
  76. end
  77.  
  78. def self.deferred_task_id
  79. @deferred_task_id
  80. end
  81.  
  82. def self.increment_deferred_task_id
  83. @deferred_task_id += 1
  84. end
  85.  
  86. def self.deferred_task_unmatched_id # :nodoc:
  87. @deferred_task_unmatched_id
  88. end
  89.  
  90. def self.increment_deferred_task_unmatched_id # :nodoc:
  91. @deferred_task_unmatched_id += 1
  92. end
  93.  
  94. def self.run_deferred_callbacks # :nodoc:
  95. tasks_completed_early = []
  96. until (@resultqueue ||= []).empty?
  97. result, cback, task_id = @resultqueue.pop
  98.  
  99. unless task_id
  100. cback.call result if cback
  101. else
  102. if task_id == deferred_task_unmatched_id
  103. cback.call result, task_id if cback
  104. increment_deferred_task_unmatched_id
  105.  
  106. tasks_completed_early.each { |task| @resultqueue.push task }
  107. tasks_completed_early.clear
  108. else
  109. tasks_completed_early << [result, cback, task_id]
  110. end
  111. end
  112. end
  113.  
  114. # push unmatched completed tasks back onto the resultqueue for processing during next signal break
  115. tasks_completed_early.each { |task| @resultqueue << task }
  116.  
  117.  
  118. @next_tick_queue ||= []
  119. if (l = @next_tick_queue.length) > 0
  120. l.times {|i| @next_tick_queue[i].call}
  121. @next_tick_queue.slice!( 0...l )
  122. end
  123.  
  124. =begin
  125. (@next_tick_queue ||= []).length.times {
  126. cback=@next_tick_queue.pop and cback.call
  127. }
  128. =end
  129. =begin
  130. if (@next_tick_queue ||= []) and @next_tick_queue.length > 0
  131. ary = @next_tick_queue.dup
  132. @next_tick_queue.clear
  133. until ary.empty?
  134. cback=ary.pop and cback.call
  135. end
  136. end
  137. =end
  138. end
  139.  
  140. def self.defer op = nil, callback = nil, task_id = nil, &blk
  141. unless @threadpool
  142. require 'thread'
  143. @threadpool = []
  144. @threadqueue = ::Queue.new
  145. @resultqueue = ::Queue.new
  146. spawn_threadpool
  147. end
  148.  
  149. @threadqueue << [op || blk, callback, task_id]
  150. end
  151.  
  152.  
  153. def self.spawn_threadpool
  154. until @threadpool.size == 20
  155. thread = Thread.new do
  156. while true
  157. op, cback, task_id = *@threadqueue.pop
  158. result = op.call
  159. @resultqueue << [result, cback, task_id]
  160. EventMachine.signal_loopbreak
  161. end
  162. end
  163. @threadpool << thread
  164. end
  165. end
  166. end
  167.  
  168.  
  169.  
  170. puts "\n\nillustrates new deterministic/ordered #defer behavior."
  171. puts "results ordered by task id."
  172. EM.run do
  173. n.times do
  174. index = EM.deferred_task_id
  175. operation = proc {
  176. sleep work_array[index]
  177. work_array[index]
  178. }
  179. cback = proc { |result, index|
  180. work_time = result
  181. puts "work time [#{work_time}] index [#{index}]"
  182. }
  183.  
  184. EM.defer operation, cback, EM.deferred_task_id
  185. EM.increment_deferred_task_id
  186. end
  187.  
  188. EM.add_timer(work_array.max + 1) { EM.stop }
  189. end
  190.  
  191.  
  192.  
  193. puts "\n\nillustrates a mixture of non-deterministic and deterministic tasks."
  194. puts "result's order is mixed by work time and task id."
  195. EM.run do
  196. n.times do |i|
  197. unless (i % 2) == 0
  198. index = EM.deferred_task_id
  199. operation = proc {
  200. sleep work_array[index]
  201. work_array[index]
  202. }
  203. cback = proc { |result, index|
  204. work_time = result
  205. puts "work time [#{work_time}] index [#{index}]"
  206. }
  207.  
  208. EM.defer operation, cback, EM.deferred_task_id
  209. EM.increment_deferred_task_id
  210. else
  211. operation = proc {
  212. sleep work_array[i]
  213. work_array[i]
  214. }
  215. cback = proc { |result|
  216. work_time = result
  217. puts "work time [#{work_time}] index [none]"
  218. }
  219.  
  220. EM.defer operation, cback
  221. end
  222. end
  223.  
  224. EM.add_timer(work_array.max + 1) { EM.stop }
  225. end
Add Comment
Please, Sign In to add comment