Advertisement
Guest User

bino/0MQ_MonitoredDevice

a guest
Aug 28th, 2014
290
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.62 KB | None | 0 0
  1. #!/usr/bin/python
  2. import time
  3. import zmq
  4. from zmq.devices.basedevice import ProcessDevice
  5. from zmq.devices.monitoredqueuedevice import MonitoredQueue
  6. from zmq.utils.strtypes import asbytes
  7. from multiprocessing import Process
  8.  
  9. #Gate
  10. f_url='ipc:///tmp/fsock'
  11. b_url='ipc:///tmp/bsock'
  12. m_url='ipc:///tmp/msock'
  13.  
  14. def gate():
  15.     in_prefix=asbytes('in')
  16.     out_prefix=asbytes('out')
  17.     monitoringdevice = MonitoredQueue(zmq.PULL, zmq.PUSH, zmq.PUB, in_prefix, out_prefix)
  18.  
  19.     monitoringdevice.bind_in(f_url)
  20.     monitoringdevice.bind_out(b_url)
  21.     monitoringdevice.bind_mon(m_url)
  22.  
  23.     monitoringdevice.start()  
  24.     print "Program: GATE has started"
  25.  
  26. def puller():
  27.     print 'PULLER STARTED'
  28.     ctx=zmq.Context()
  29.     sock=ctx.socket(zmq.PULL)
  30.     sock.connect(b_url)
  31.     while True :
  32.         data=sock.recv()
  33.         print 'PULLER Got :', data
  34.  
  35. def monitor(url) :
  36.     print 'MONITOR STARTED !'
  37.     ctx=zmq.Context()
  38.     sock=ctx.socket(zmq.SUB)
  39.     sock.connect(url)
  40.     sock.setsockopt(zmq.SUBSCRIBE, '')
  41.     while True :
  42.         data= sock.recv()
  43.         print 'Monitor :',url,'GOT :', data
  44.  
  45. def sender() :
  46.     ctx=zmq.Context()
  47.     sock=ctx.socket(zmq.PUSH)
  48.     sock.connect(f_url)
  49.     for counter in range(10) :
  50.         print 'Sender said :', counter
  51.         sock.send(str(counter), zmq.NOBLOCK)
  52.         time.sleep(2)
  53.  
  54.  
  55. monitor_p=Process(target=monitor, args=(m_url,))
  56. monitor_p.start()
  57. time.sleep(1)
  58.  
  59. puller_p = Process(target=puller,)
  60. puller_p.start()
  61. time.sleep(1)
  62.  
  63. gate_p = Process(target=gate)
  64. gate_p.start()
  65. time.sleep(1)
  66.  
  67. sender_p = Process(target=sender)
  68. sender_p.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement