oldrev

ZeroMQ 多进程生产者/消费者模式自动负载均衡服务器的例子

Jun 17th, 2011
748
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Ruby 4.38 KB | None | 0 0
  1. #encoding: utf-8
  2. #
  3. require 'rubygems'
  4. require 'ffi-rzmq'
  5. require 'json'
  6.  
  7. # The "ventilator" function generates a list of numbers from 0 to 1000, and
  8. # sends those numbers down a zeromq "PUSH" connection to be processed by
  9. # listening workers, in a round robin load balanced fashion.
  10. #
  11. def send_json(socket, obj)
  12.   msg = JSON::generate(obj)
  13.   socket.send_string(msg)
  14. end
  15.  
  16. def recv_json(socket)
  17.   msg = socket.recv_string()
  18.   return JSON::parse(msg)
  19. end
  20.  
  21. def ventilator()
  22.   puts "任务分配进程:PID=[#{Process.pid}]"
  23.   # Initialize a zeromq context
  24.   context = ZMQ::Context.new
  25.  
  26.   # Set up a channel to send work
  27.   ventilator_send = context.socket(ZMQ::PUSH)
  28.   ventilator_send.bind("tcp://127.0.0.1:5557")
  29.  
  30.   # Give everything a second to spin up and connect
  31.   sleep 1.0
  32.  
  33.   # Send the numbers between 1 and 1 million as work messages
  34.   for num in 0...1000
  35.     work_message = { 'num' => num }
  36.     send_json(ventilator_send, work_message)
  37.   end
  38.  
  39.   sleep 1.0
  40.   puts "任务分配进程正在退出"
  41. end
  42.  
  43. # The "worker" functions listen on a zeromq PULL connection for "work"
  44. # (numbers to be processed) from the ventilator, square those numbers,
  45. # and send the results down another zeromq PUSH connection to the
  46. # results manager.
  47.  
  48. def worker(wrk_num)
  49.   # Initialize a zeromq context
  50.   wrk_num = Process.pid
  51.   puts "正在启动工作者进程 PID=[#{wrk_num}]"
  52.  
  53.   context = ZMQ::Context.new
  54.  
  55.   # Set up a channel to receive work from the ventilator
  56.   work_receiver = context.socket(ZMQ::PULL)
  57.   work_receiver.connect("tcp://127.0.0.1:5557")
  58.  
  59.   # Set up a channel to send result of work to the results reporter
  60.   results_sender = context.socket(ZMQ::PUSH)
  61.   results_sender.connect("tcp://127.0.0.1:5558")
  62.  
  63.   # Set up a channel to receive control messages over
  64.   control_receiver = context.socket(ZMQ::SUB)
  65.   control_receiver.connect("tcp://127.0.0.1:5559")
  66.   control_receiver.setsockopt(ZMQ::SUBSCRIBE, "")
  67.  
  68.   # Set up a poller to multiplex the work receiver and control receiver channels
  69.   poller = ZMQ::Poller.new
  70.   poller.register(work_receiver, ZMQ::POLLIN)
  71.   poller.register(control_receiver, ZMQ::POLLIN)
  72.  
  73.   # Loop and accept messages from both channels, acting accordingly
  74.   keep_alive = true
  75.   while keep_alive
  76.     poller.poll(:blocking)
  77.     poller.readables.each do |socket|
  78.       # If the message came from work_receiver channel, square the number
  79.       # and send the answer to the results reporter
  80.       if socket === work_receiver then
  81.         work_message = recv_json(work_receiver)
  82.         product = work_message['num'] * work_message['num']
  83.         answer_message = { 'worker' => wrk_num, 'result' => product }
  84.         send_json(results_sender, answer_message)
  85.       end
  86.  
  87.       # If the message came over the control channel, shut down the worker.
  88.       if socket === control_receiver then
  89.         control_message = control_receiver.recv_string
  90.         if control_message == "FINISHED" then
  91.           puts "工作者进程 PID=[#{wrk_num}] 接受到 'FINISHED'指令,正在退出 "
  92.           keep_alive = false
  93.           break
  94.         end
  95.       end
  96.     end
  97.   end
  98.  
  99.   # The "results_manager" function receives each result from multiple workers,
  100.   # and prints those results.  When all results have been received, it signals
  101.   # the worker processes to shut down.
  102. end
  103.  
  104. def result_sink()
  105.   puts "我是结果收集进程,PID=[#{Process.pid}]"
  106.   # Initialize a zeromq context
  107.   context = ZMQ::Context.new
  108.  
  109.   # Set up a channel to receive results
  110.   results_receiver = context.socket(ZMQ::PULL)
  111.   results_receiver.bind("tcp://127.0.0.1:5558")
  112.  
  113.   # Set up a channel to send control commands
  114.   control_sender = context.socket(ZMQ::PUB)
  115.   control_sender.bind("tcp://127.0.0.1:5559")
  116.  
  117.   for task_nbr in 0...1000
  118.     result_message = recv_json(results_receiver)
  119.     puts "工作者进程 PID=[#{result_message['worker']}] 返回结果=[#{result_message['result']}]"
  120.   end
  121.  
  122.   # Signal to all workers that we are finsihed
  123.   control_sender.send_string("FINISHED")
  124.   sleep 2.0
  125.   puts "结果收集进程正在退出"
  126. end
  127.  
  128.  
  129. # 启动工作者进程
  130. worker_pool = 10
  131. for wrk_num in 0...worker_pool
  132.   Process.fork do
  133.     worker wrk_num
  134.   end
  135. end
  136.  
  137. # 启动结果收集进程
  138. result_sink_pid = Process.fork do
  139.   result_sink
  140. end
  141.  
  142. # 启动分配器
  143. Process.fork do
  144.   ventilator
  145. end
  146.  
  147. Process.waitpid(result_sink_pid)
  148. puts "主进程退出"
Advertisement
Add Comment
Please, Sign In to add comment