SHARE
TWEET

namespace/queues with MP and zmq as example

DeaD_EyE Aug 3rd, 2018 78 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import time
  2. from multiprocessing import Manager, Process
  3.  
  4.  
  5. def worker1(namespace, iterations):
  6.     for i in range(iterations):
  7.         namespace.worker1_var = i
  8.  
  9. def worker2(namespace, iterations):
  10.     for i in range(iterations):
  11.         result = namespace.worker1_var * i
  12.         print('Worker2:', result)
  13.  
  14. print('Multiprocessing with namespace:')
  15.  
  16.  
  17. # Create a manager, which starts a new Python
  18. # Process to act as a manager between the other
  19. # processes
  20. manager = Manager()
  21. # Namespace is like a class, where you can assign or get
  22. # your variables
  23. ns = manager.Namespace()
  24. # Creating a new variable in the namespace
  25. ns.worker1_var = 0
  26.  
  27.  
  28. # create new processes and give the namesapce as argument to the function
  29. proc1 = Process(target=worker1, args=(ns, 10))
  30. proc2 = Process(target=worker2, args=(ns, 10))
  31. # start processes
  32. proc1.start()
  33. proc2.start()
  34. # wait until the processes are finished
  35. proc1.join()
  36. proc2.join()
  37.  
  38. print('\nMultiprocessing with queues:')
  39. # queues are often the better for jobs
  40. # for configuration or something like global state
  41. # I prefer the namespace
  42.  
  43. def generator(queue):
  44.     for i in range(10):
  45.         queue.put(i)
  46.  
  47. def worker(queue):
  48.     while True:
  49.         print('Worker:' ,queue.get())
  50.  
  51. # Now make a queue object, which is handled by manager
  52. queue = manager.Queue()
  53. # now the manager controls the Queue between the processes
  54. # everything important is handled like locking
  55. proc_gen = Process(target=generator, args=(queue,))
  56. proc_work = Process(target=worker, args=(queue,))
  57. proc_gen.start()
  58. proc_work.start()
  59.  
  60. proc_gen.join()
  61. proc_work.terminate() # killing this endless process
  62.  
  63.  
  64. # --------------------------------------
  65.  
  66. import threading
  67. import zmq
  68.  
  69.  
  70. print('\nZMQ Example in one process,\ncheating with threading to do this in one process.')
  71.  
  72. # this is for example in first.py
  73. def zmq_publish(pub):
  74.     for i in range(10):
  75.         data = str(i).encode()
  76.         #print('Data has been sent.')
  77.         pub.send_multipart([b'velocity', data])
  78.  
  79. # in second.py for example
  80. def printer(sub):
  81.     while True:
  82.         topic, data = sub.recv_multipart()
  83.         value = int(data)
  84.         print(topic.decode(), value)
  85.         if value == 9:
  86.             break
  87.  
  88. context = zmq.Context()
  89. publisher = context.socket(zmq.PUB)
  90. subscriber = context.socket(zmq.SUB)
  91. publisher.bind('tcp://127.0.0.1:5090')
  92. subscriber.connect('tcp://127.0.0.1:5090')
  93. subscriber.subscribe(b'velocity')
  94.  
  95. # give the subscriber a little bit time to connect
  96. # otherwise the data is already sent, before the
  97. # subscriber has been connected
  98. # and then this data is lost
  99.  
  100. time.sleep(1)
  101.  
  102. # this one the big benefits of zmq, because the order of connection
  103. # doesn't matter.
  104. # the publisher can already send, where there is no subsriber
  105. # and reversed
  106. # the connection is established magically in the background
  107. #
  108. # but you choose also the other concepts of zmq:
  109. # Publish -> Subscribe
  110. # Request <-> Reply
  111. # Push -> Pull
  112.  
  113.  
  114.  
  115. thread = threading.Thread(target=zmq_publish, args=(publisher,))
  116. thread.start()
  117. # obviosly multiprocess.Process has the
  118. # same interface like threading.Thread
  119. # I'm using this only for demonstration
  120. # in one single process
  121. # This won't work with multiprocessing, because the context is
  122. # creates not inside the generator/worker process
  123. # with a thread it works because of shared memory
  124.  
  125. printer(subscriber)
  126.  
  127. # destroying the context
  128. # closes everything
  129. context.destroy()
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top