DeaD_EyE

namespace/queues with MP and zmq as example

Aug 3rd, 2018
84
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import time
  2. from multiprocessing import Manager, Process
  3.  
  4.  
  5. def worker1(namespace, iterations):
  6.     for i in range(iterations):
  7.         namespace.worker1_var = i
  8.  
  9. def worker2(namespace, iterations):
  10.     for i in range(iterations):
  11.         result = namespace.worker1_var * i
  12.         print('Worker2:', result)
  13.  
  14. print('Multiprocessing with namespace:')
  15.  
  16.  
  17. # Create a manager, which starts a new Python
  18. # Process to act as a manager between the other
  19. # processes
  20. manager = Manager()
  21. # Namespace is like a class, where you can assign or get
  22. # your variables
  23. ns = manager.Namespace()
  24. # Creating a new variable in the namespace
  25. ns.worker1_var = 0
  26.  
  27.  
  28. # create new processes and give the namesapce as argument to the function
  29. proc1 = Process(target=worker1, args=(ns, 10))
  30. proc2 = Process(target=worker2, args=(ns, 10))
  31. # start processes
  32. proc1.start()
  33. proc2.start()
  34. # wait until the processes are finished
  35. proc1.join()
  36. proc2.join()
  37.  
  38. print('\nMultiprocessing with queues:')
  39. # queues are often the better for jobs
  40. # for configuration or something like global state
  41. # I prefer the namespace
  42.  
  43. def generator(queue):
  44.     for i in range(10):
  45.         queue.put(i)
  46.  
  47. def worker(queue):
  48.     while True:
  49.         print('Worker:' ,queue.get())
  50.  
  51. # Now make a queue object, which is handled by manager
  52. queue = manager.Queue()
  53. # now the manager controls the Queue between the processes
  54. # everything important is handled like locking
  55. proc_gen = Process(target=generator, args=(queue,))
  56. proc_work = Process(target=worker, args=(queue,))
  57. proc_gen.start()
  58. proc_work.start()
  59.  
  60. proc_gen.join()
  61. proc_work.terminate() # killing this endless process
  62.  
  63.  
  64. # --------------------------------------
  65.  
  66. import threading
  67. import zmq
  68.  
  69.  
  70. print('\nZMQ Example in one process,\ncheating with threading to do this in one process.')
  71.  
  72. # this is for example in first.py
  73. def zmq_publish(pub):
  74.     for i in range(10):
  75.         data = str(i).encode()
  76.         #print('Data has been sent.')
  77.         pub.send_multipart([b'velocity', data])
  78.  
  79. # in second.py for example
  80. def printer(sub):
  81.     while True:
  82.         topic, data = sub.recv_multipart()
  83.         value = int(data)
  84.         print(topic.decode(), value)
  85.         if value == 9:
  86.             break
  87.  
  88. context = zmq.Context()
  89. publisher = context.socket(zmq.PUB)
  90. subscriber = context.socket(zmq.SUB)
  91. publisher.bind('tcp://127.0.0.1:5090')
  92. subscriber.connect('tcp://127.0.0.1:5090')
  93. subscriber.subscribe(b'velocity')
  94.  
  95. # give the subscriber a little bit time to connect
  96. # otherwise the data is already sent, before the
  97. # subscriber has been connected
  98. # and then this data is lost
  99.  
  100. time.sleep(1)
  101.  
  102. # this one the big benefits of zmq, because the order of connection
  103. # doesn't matter.
  104. # the publisher can already send, where there is no subsriber
  105. # and reversed
  106. # the connection is established magically in the background
  107. #
  108. # but you choose also the other concepts of zmq:
  109. # Publish -> Subscribe
  110. # Request <-> Reply
  111. # Push -> Pull
  112.  
  113.  
  114.  
  115. thread = threading.Thread(target=zmq_publish, args=(publisher,))
  116. thread.start()
  117. # obviosly multiprocess.Process has the
  118. # same interface like threading.Thread
  119. # I'm using this only for demonstration
  120. # in one single process
  121. # This won't work with multiprocessing, because the context is
  122. # creates not inside the generator/worker process
  123. # with a thread it works because of shared memory
  124.  
  125. printer(subscriber)
  126.  
  127. # destroying the context
  128. # closes everything
  129. context.destroy()
RAW Paste Data