Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #
- from multiprocessing import Process
- from zmq import Context, Poller
- from zmq.backend.cython.constants import REQ, ROUTER, POLLIN
- NBR_WORKERS = 3
- NBR_CLIENTS = 1000
- FRONTEND_PORT = '7777'
- FRONTEND_SERVER = '127.0.0.1'
- BACKEND_PORT = '7778'
- BACKEND_SERVER = '127.0.0.2'
- def client_task(ident):
- """Basic request-reply client using REQ socket."""
- socket = Context().socket(REQ)
- socket.identity = f'Client-{ident}'.encode('utf-8')
- socket.connect(f'tcp://{FRONTEND_SERVER}:{FRONTEND_PORT}')
- # Send request, get reply
- socket.send_string("Ping Server")
- reply = socket.recv()
- print(f'{socket.identity.decode("ascii")}: {reply.decode("ascii")}')
- def worker_task(ident):
- """Worker task, using a REQ socket to do load-balancing."""
- socket = Context().socket(REQ)
- socket.identity = u"Worker-{}".format(ident).encode("ascii")
- socket.connect(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
- # Tell broker we're ready for work
- socket.send_string("READY")
- while True:
- address, empty, request = socket.recv_multipart()
- print(f'{socket.identity.decode("ascii")}: {request.decode("ascii")}')
- socket.send_multipart([address, b"", b"Pong Client"])
- def main():
- context = Context()
- frontend = context.socket(ROUTER)
- frontend.bind(f'tcp://{FRONTEND_SERVER}:{FRONTEND_PORT}')
- backend = context.socket(ROUTER)
- backend.bind(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
- # Start background tasks
- def start(task, *args):
- process = Process(target=task, args=args)
- process.daemon = True
- process.start()
- for i in range(NBR_CLIENTS):
- start(client_task, i)
- for i in range(NBR_WORKERS):
- start(worker_task, i)
- # Initialize main loop state
- count = NBR_CLIENTS
- workers = []
- poller = Poller()
- # Only poll for requests from backend until workers are available
- poller.register(backend, POLLIN)
- while True:
- sockets = dict(poller.poll())
- if backend in sockets:
- # Handle worker activity on the backend
- request = backend.recv_multipart()
- worker, empty, client = request[:3]
- if not workers:
- # Poll for clients now that a worker is available
- poller.register(frontend, POLLIN)
- workers.append(worker)
- if client != "READY" and len(request) > 3:
- # If client reply, send rest back to frontend
- empty, reply = request[3:]
- frontend.send_multipart([client, b"", reply])
- count -= 1
- if not count:
- break
- if frontend in sockets:
- # Get next client request, route to last-used worker
- client, empty, request = frontend.recv_multipart()
- worker = workers.pop(0)
- backend.send_multipart([worker, b"", client, b"", request])
- if not workers:
- # Don't poll clients if no workers are available
- poller.unregister(frontend)
- # Clean up
- backend.close()
- frontend.close()
- context.term()
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement