Advertisement
Uno-Dan

multiprocessing

Apr 20th, 2019
223
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.22 KB | None | 0 0
  1. #
  2. from multiprocessing import Process
  3.  
  4. from zmq import Context, Poller
  5. from zmq.backend.cython.constants import REQ, ROUTER, POLLIN
  6.  
  7. NBR_WORKERS = 3
  8. NBR_CLIENTS = 1000
  9.  
  10. FRONTEND_PORT = '7777'
  11. FRONTEND_SERVER = '127.0.0.1'
  12.  
  13. BACKEND_PORT = '7778'
  14. BACKEND_SERVER = '127.0.0.2'
  15.  
  16.  
  17. def client_task(ident):
  18.     """Basic request-reply client using REQ socket."""
  19.     socket = Context().socket(REQ)
  20.  
  21.     socket.identity = f'Client-{ident}'.encode('utf-8')
  22.     socket.connect(f'tcp://{FRONTEND_SERVER}:{FRONTEND_PORT}')
  23.  
  24.     # Send request, get reply
  25.     socket.send_string("Ping Server")
  26.     reply = socket.recv()
  27.     print(f'{socket.identity.decode("ascii")}: {reply.decode("ascii")}')
  28.  
  29.  
  30. def worker_task(ident):
  31.     """Worker task, using a REQ socket to do load-balancing."""
  32.     socket = Context().socket(REQ)
  33.     socket.identity = u"Worker-{}".format(ident).encode("ascii")
  34.     socket.connect(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
  35.  
  36.     # Tell broker we're ready for work
  37.     socket.send_string("READY")
  38.  
  39.     while True:
  40.         address, empty, request = socket.recv_multipart()
  41.         print(f'{socket.identity.decode("ascii")}: {request.decode("ascii")}')
  42.         socket.send_multipart([address, b"", b"Pong Client"])
  43.  
  44.  
  45. def main():
  46.     context = Context()
  47.  
  48.     frontend = context.socket(ROUTER)
  49.     frontend.bind(f'tcp://{FRONTEND_SERVER}:{FRONTEND_PORT}')
  50.  
  51.     backend = context.socket(ROUTER)
  52.     backend.bind(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
  53.  
  54.     # Start background tasks
  55.     def start(task, *args):
  56.         process = Process(target=task, args=args)
  57.         process.daemon = True
  58.         process.start()
  59.  
  60.     for i in range(NBR_CLIENTS):
  61.         start(client_task, i)
  62.  
  63.     for i in range(NBR_WORKERS):
  64.         start(worker_task, i)
  65.  
  66.     # Initialize main loop state
  67.     count = NBR_CLIENTS
  68.     workers = []
  69.     poller = Poller()
  70.     # Only poll for requests from backend until workers are available
  71.     poller.register(backend, POLLIN)
  72.  
  73.     while True:
  74.         sockets = dict(poller.poll())
  75.  
  76.         if backend in sockets:
  77.             # Handle worker activity on the backend
  78.             request = backend.recv_multipart()
  79.             worker, empty, client = request[:3]
  80.  
  81.             if not workers:
  82.                 # Poll for clients now that a worker is available
  83.                 poller.register(frontend, POLLIN)
  84.             workers.append(worker)
  85.  
  86.             if client != "READY" and len(request) > 3:
  87.                 # If client reply, send rest back to frontend
  88.                 empty, reply = request[3:]
  89.                 frontend.send_multipart([client, b"", reply])
  90.                 count -= 1
  91.                 if not count:
  92.                     break
  93.  
  94.         if frontend in sockets:
  95.             # Get next client request, route to last-used worker
  96.             client, empty, request = frontend.recv_multipart()
  97.             worker = workers.pop(0)
  98.             backend.send_multipart([worker, b"", client, b"", request])
  99.             if not workers:
  100.                 # Don't poll clients if no workers are available
  101.                 poller.unregister(frontend)
  102.  
  103.     # Clean up
  104.     backend.close()
  105.     frontend.close()
  106.     context.term()
  107.  
  108.  
  109. if __name__ == "__main__":
  110.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement