Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #
- from time import sleep
- from random import randint
- from multiprocessing import Process
- from zmq import Context, Poller
- from zmq.backend.cython.constants import REQ, ROUTER, POLLIN
- NBR_WORKERS = 10
- FRONTEND_PORT = '7770'
- FRONTEND_SERVER = '127.0.0.1'
- BACKEND_PORT = '7771'
- BACKEND_SERVER = '127.0.0.2'
- class Server:
- def __init__(self):
- self.ctx = \
- self.backend = \
- self.frontend = None
- self.processes = []
- def start(self, nbr_workers):
- self.ctx = Context()
- # Clients connect to the frontend
- self.frontend = self.ctx.socket(ROUTER)
- self.frontend.bind(f'tcp://{FRONTEND_SERVER}:{FRONTEND_PORT}')
- # Workers connect to the backend
- self.backend = self.ctx.socket(ROUTER)
- self.backend.bind(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
- for idx in range(nbr_workers):
- self.start_worker(self.worker_task, idx)
- self.broker()
- @staticmethod
- def worker_task(index):
- """Worker task, using a REQ socket to do load-balancing."""
- socket = Context().socket(REQ)
- identity = f'Server-Worker-{index}'
- socket.identity = identity.encode('utf-8')
- socket.connect(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
- # Tell broker we're ready for work
- socket.send_string('READY')
- while True:
- print(f'{identity} waiting for messages')
- address, empty, request = socket.recv_multipart()
- sleep(randint(1, 3))
- print(f'{identity}: got message={request.decode("utf-8")} from {address.decode("utf-8")}')
- socket.send_multipart([address, b'', b'got message=Pong from Server-' + socket.identity])
- # Start daemon worker tasks
- def start_worker(self, task, *args):
- process = Process(target=task, args=args)
- self.processes.append(process)
- process.daemon = True
- process.start()
- def broker(self):
- # Initialize main loop
- poller = Poller()
- workers = []
- backend = self.backend
- frontend = self.frontend
- # Only poll for requests from backend until workers are available
- poller.register(backend, POLLIN)
- while True:
- sockets = dict(poller.poll())
- if backend in sockets:
- # Handle worker activity on the backend
- request = backend.recv_multipart()
- worker, empty, client = request[:3]
- if not workers:
- # Poll for clients now that a worker is available
- poller.register(frontend, POLLIN)
- workers.append(worker)
- if client != 'READY' and len(request) > 3:
- # If client reply, send rest back to frontend
- empty, reply = request[3:]
- frontend.send_multipart([client, b'', reply])
- elif client.decode('utf-8') == 'STOP':
- self.destroy()
- break
- if frontend in sockets:
- # Get next client request, route to last-used worker
- client, empty, request = frontend.recv_multipart()
- worker = workers.pop(0)
- backend.send_multipart([worker, b'', client, b'', request])
- if not workers:
- # Don't poll clients if no workers are available
- poller.unregister(frontend)
- def destroy(self):
- self.backend.close()
- self.frontend.close()
- self.ctx.term()
- for process in self.processes:
- process.terminate()
- def main():
- srv = Server()
- srv.start(NBR_WORKERS)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement