Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from multiprocessing import Process
- import sys
- import random
- import zmq
- import zmq.asyncio
- import asyncio
- ctx = zmq.asyncio.Context()
- async def run_worker(protA=5556, portB=5557):
- sock_pull = ctx.socket(zmq.PULL)
- sock_pull.connect(f'tcp://localhost:{protA}')
- sock_push = ctx.socket(zmq.PUSH)
- sock_push.connect(f'tcp://localhost:{portB}'
- while True:
- data = await sock_pull.recv_pyobj()
- if isinstance(data, int) or isinstance(data, float):
- await asyncio.sleep(data * 0.01)
- x = '+' if int(data) % 2 == 0 else '-'
- await sock_push.send_string(x)
- def proc_worker(portA=5556, portB=5557):
- asyncio.run(run_worker(portA, portB))
- async def run_sink(port=5557):
- sock = ctx.socket(zmq.PULL)
- sock.bind(f'tcp://*:{port}')
- amount = int.from_bytes((await sock.recv()), 'big')
- print(f"GOT AMOUNT: {amount}")
- for _ in range(amount):
- result = await sock.recv_string()
- sys.stdout.write(result)
- sys.stdout.flush()
- def proc_sink(port=5557):
- asyncio.run(run_sink(port))
- async def run_vent():
- sock_push = ctx.socket(zmq.PUSH)
- sock_push.bind(f'tcp://*:5556')
- sock_cmd = ctx.socket(zmq.PUSH)
- sock_cmd.connect('tcp://localhost:5557')
- #input('PRESS ENTER TO START')
- amount = 1_000
- await sock_cmd.send(amount.to_bytes(4, 'big'))
- sock_cmd.close()
- for _ in range(amount):
- v = random.randrange(10, 200)
- await sock_push.send_pyobj(v)
- sock_push.close()
- def proc_vent():
- asyncio.run(run_vent())
- if __name__ == '__main__':
- workers = [Process(target=proc_worker) for _ in range(10)]
- sink = Process(target=proc_sink)
- sink.start()
- for w in workers:
- w.start()
- proc_vent()
- sink.join()
- for w in workers:
- w.terminate()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement