Guest User

Untitled

a guest
Feb 21st, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.18 KB | None | 0 0
  1. require 'beanstalk-client.rb'
  2. require 'ruby-debug'
  3.  
  4. DEFAULT_PORT = 11300
  5. SERVER_IP = '127.0.0.1'
  6. #beanstalk will order the queues based on priority, with the same priority
  7. #it acts FIFO, in a later example we will use the priority
  8. #(higher numbers are higher priority)
  9. DEFAULT_PRIORITY = 65536
  10. #TTR is time for the job to reappear on the queue.
  11. #Assuming a worker died before completing work and never called job.delete
  12. #the same job would return back on the queue (in TTR seconds)
  13. TTR = 3
  14.  
  15. class BeanBase
  16.  
  17. #To work with multiple queues you must tell beanstalk which queues
  18. #you plan on writing to (use), and which queues you will reserve jobs from
  19. #(watch). In this case we also want to ignore the default queue
  20. #you need a different queue object for each tube you plan on using or
  21. #you can switch what the tub is watching and using a bunch, we just keep a few
  22. #queues open on the tubes we want.
  23. def get_queue(queue_name)
  24. @queue_cache ||= {}
  25. if @queue_cache.has_key?(queue_name)
  26. return @queue_cache[queue_name]
  27. else
  28. queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
  29. queue.watch(queue_name)
  30. queue.use(queue_name)
  31. queue.ignore('default')
  32. @queue_cache[queue_name] = queue
  33. return queue
  34. end
  35. end
  36.  
  37. #this will take a message off the queue, and process it with the block
  38. def take_msg(queue)
  39. msg = queue.reserve
  40. #by calling ybody we get the content of the message and convert it from yml
  41. body = msg.ybody
  42. if block_given?
  43. yield(body)
  44. end
  45. msg.delete
  46. end
  47.  
  48. def results_ready?(queue)
  49. queue.peek_ready!=nil
  50. end
  51.  
  52. end
  53.  
  54. class BeanDistributor < BeanBase
  55.  
  56. def initialize(chunks,points_per_chunk)
  57. @chunks = chunks
  58. @points_per_chunk = points_per_chunk
  59. @messages_out = 0
  60. @circle_count = 0
  61. end
  62.  
  63. def get_incoming_results(queue)
  64. if(results_ready?(queue))
  65. result = nil
  66. take_msg(queue) do |body|
  67. result = body.count
  68. end
  69. @messages_out -= 1
  70. print "." #display that we received another result
  71. @circle_count += result
  72. else
  73. #do nothing
  74. end
  75. end
  76.  
  77. def start_distributor
  78. request_queue = get_queue('requests')
  79. results_queue = get_queue('results')
  80. #put all the work on the request queue
  81. puts "distributor sending out #{@messages} jobs"
  82. @chunks.times do |num|
  83. msg = BeanRequest.new(1,@points_per_chunk)
  84. #Take our ruby object and convert it to yml and put it on the queue
  85. request_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
  86. @messages_out += 1
  87. #if there are results get them if not continue sending out work
  88. get_incoming_results(results_queue)
  89. end
  90.  
  91. while @messages_out > 0
  92. get_incoming_results(results_queue)
  93. end
  94. npoints = @chunks * @points_per_chunk
  95. pi = 4.0*@circle_count/(npoints)
  96. puts "\nreceived all the results our estimate for pi is: #{pi}"
  97. end
  98.  
  99. end
  100.  
  101. class BeanWorker < BeanBase
  102.  
  103. def initialize()
  104. end
  105.  
  106. def write_result(queue, result)
  107. msg = BeanResult.new(1,result)
  108. queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
  109. end
  110.  
  111. def in_circle
  112. #generate 2 random numbers see if they are in the circle
  113. range = 1000000.0
  114. radius = range / 2
  115. xcord = rand(range) - radius
  116. ycord = rand(range) - radius
  117. if( (xcord**2) + (ycord**2) <= (radius**2) )
  118. return 1
  119. else
  120. return 0
  121. end
  122. end
  123.  
  124. def start_worker
  125. request_queue = get_queue('requests')
  126. results_queue = get_queue('results')
  127. #get requests and do the work until the worker is killed
  128. while(true)
  129. result = 0
  130. take_msg(request_queue) do |body|
  131. chunks = body.count
  132. chunks.times { result += in_circle}
  133. end
  134. write_result(results_queue,result)
  135. end
  136.  
  137. end
  138.  
  139. end
  140.  
  141. ############
  142. # These are just simple message classes that we pass using beanstalks
  143. # to yml and from yml functions.
  144. ############
  145. class BeanRequest
  146. attr_accessor :project_id, :count
  147. def initialize(project_id, count=0)
  148. @project_id = project_id
  149. @count = count
  150. end
  151. end
  152.  
  153. class BeanResult
  154. attr_accessor :project_id, :count
  155. def initialize(project_id, count=0)
  156. @project_id = project_id
  157. @count = count
  158. end
  159. end
  160.  
  161. #how many different jobs we should do
  162. chunks = 100
  163. #how many points to calculate per chunk
  164. points_per_chunk = 10000
  165. #how many workers should we have
  166. #(normally different machines, in our example fork them off)
  167. workers = 5
  168.  
  169. # Most of the time you will have two entirely separate classes
  170. # but to make it easy to run this example we will just fork and start our server
  171. # and client separately. We will wait for them to complete and check
  172. # if we received all the messages we expected.
  173. puts "starting distributor"
  174. server_pid = fork {
  175. BeanDistributor.new(chunks,points_per_chunk).start_distributor
  176. }
  177.  
  178. puts "starting client(s)"
  179. client_pids = []
  180. workers.times do |num|
  181. client_pid = fork {
  182. BeanWorker.new.start_worker
  183. }
  184. client_pids << client_pid
  185. end
  186.  
  187. Process.wait(server_pid)
  188. #take down the clients
  189. client_pids.each do |pid|
  190. Process.kill("HUP",pid)
  191. end
Add Comment
Please, Sign In to add comment