Advertisement
Guest User

Untitled

a guest
Jun 27th, 2019
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.90 KB | None | 0 0
  1. from multiprocessing import Process
  2. import sys
  3. import random
  4. import zmq
  5. import zmq.asyncio
  6. import asyncio
  7.  
  8.  
  9. ctx = zmq.asyncio.Context()
  10.  
  11. async def run_worker(protA=5556, portB=5557):
  12. sock_pull = ctx.socket(zmq.PULL)
  13. sock_pull.connect(f'tcp://localhost:{protA}')
  14. sock_push = ctx.socket(zmq.PUSH)
  15. sock_push.connect(f'tcp://localhost:{portB}'
  16.  
  17. while True:
  18. data = await sock_pull.recv_pyobj()
  19. if isinstance(data, int) or isinstance(data, float):
  20. await asyncio.sleep(data * 0.01)
  21. x = '+' if int(data) % 2 == 0 else '-'
  22. await sock_push.send_string(x)
  23.  
  24.  
  25. def proc_worker(portA=5556, portB=5557):
  26. asyncio.run(run_worker(portA, portB))
  27.  
  28.  
  29. async def run_sink(port=5557):
  30. sock = ctx.socket(zmq.PULL)
  31. sock.bind(f'tcp://*:{port}')
  32.  
  33. amount = int.from_bytes((await sock.recv()), 'big')
  34. print(f"GOT AMOUNT: {amount}")
  35.  
  36. for _ in range(amount):
  37. result = await sock.recv_string()
  38. sys.stdout.write(result)
  39. sys.stdout.flush()
  40.  
  41.  
  42. def proc_sink(port=5557):
  43. asyncio.run(run_sink(port))
  44.  
  45.  
  46. async def run_vent():
  47. sock_push = ctx.socket(zmq.PUSH)
  48. sock_push.bind(f'tcp://*:5556')
  49. sock_cmd = ctx.socket(zmq.PUSH)
  50. sock_cmd.connect('tcp://localhost:5557')
  51.  
  52. #input('PRESS ENTER TO START')
  53. amount = 1_000
  54. await sock_cmd.send(amount.to_bytes(4, 'big'))
  55. sock_cmd.close()
  56.  
  57. for _ in range(amount):
  58. v = random.randrange(10, 200)
  59. await sock_push.send_pyobj(v)
  60. sock_push.close()
  61.  
  62.  
  63. def proc_vent():
  64. asyncio.run(run_vent())
  65.  
  66.  
  67. if __name__ == '__main__':
  68. workers = [Process(target=proc_worker) for _ in range(10)]
  69. sink = Process(target=proc_sink)
  70. sink.start()
  71. for w in workers:
  72. w.start()
  73. proc_vent()
  74. sink.join()
  75. for w in workers:
  76. w.terminate()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement