Advertisement
Uno-Dan

server

Apr 22nd, 2019
208
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.80 KB | None | 0 0
  1. #
  2. from time import sleep
  3. from random import randint
  4.  
  5. from multiprocessing import Process
  6.  
  7. from zmq import Context, Poller
  8. from zmq.backend.cython.constants import REQ, ROUTER, POLLIN
  9.  
  10.  
  11. NBR_WORKERS = 10
  12.  
  13. FRONTEND_PORT = '7770'
  14. FRONTEND_SERVER = '127.0.0.1'
  15.  
  16. BACKEND_PORT = '7771'
  17. BACKEND_SERVER = '127.0.0.2'
  18.  
  19.  
  20. class Server:
  21. def __init__(self):
  22. self.ctx = \
  23. self.backend = \
  24. self.frontend = None
  25.  
  26. self.processes = []
  27.  
  28. def start(self, nbr_workers):
  29. self.ctx = Context()
  30.  
  31. # Clients connect to the frontend
  32. self.frontend = self.ctx.socket(ROUTER)
  33. self.frontend.bind(f'tcp://{FRONTEND_SERVER}:{FRONTEND_PORT}')
  34.  
  35. # Workers connect to the backend
  36. self.backend = self.ctx.socket(ROUTER)
  37. self.backend.bind(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
  38.  
  39. for idx in range(nbr_workers):
  40. self.start_worker(self.worker_task, idx)
  41.  
  42. self.broker()
  43.  
  44. @staticmethod
  45. def worker_task(index):
  46. """Worker task, using a REQ socket to do load-balancing."""
  47. socket = Context().socket(REQ)
  48.  
  49. identity = f'Server-Worker-{index}'
  50.  
  51. socket.identity = identity.encode('utf-8')
  52. socket.connect(f'tcp://{BACKEND_SERVER}:{BACKEND_PORT}')
  53.  
  54. # Tell broker we're ready for work
  55. socket.send_string('READY')
  56.  
  57. while True:
  58. print(f'{identity} waiting for messages')
  59. address, empty, request = socket.recv_multipart()
  60.  
  61. sleep(randint(1, 3))
  62.  
  63. print(f'{identity}: got message={request.decode("utf-8")} from {address.decode("utf-8")}')
  64. socket.send_multipart([address, b'', b'got message=Pong from Server-' + socket.identity])
  65.  
  66. # Start daemon worker tasks
  67. def start_worker(self, task, *args):
  68. process = Process(target=task, args=args)
  69. self.processes.append(process)
  70. process.daemon = True
  71. process.start()
  72.  
  73. def broker(self):
  74. # Initialize main loop
  75. poller = Poller()
  76. workers = []
  77. backend = self.backend
  78. frontend = self.frontend
  79.  
  80. # Only poll for requests from backend until workers are available
  81. poller.register(backend, POLLIN)
  82.  
  83. while True:
  84. sockets = dict(poller.poll())
  85.  
  86. if backend in sockets:
  87. # Handle worker activity on the backend
  88. request = backend.recv_multipart()
  89. worker, empty, client = request[:3]
  90.  
  91. if not workers:
  92. # Poll for clients now that a worker is available
  93. poller.register(frontend, POLLIN)
  94. workers.append(worker)
  95.  
  96. if client != 'READY' and len(request) > 3:
  97. # If client reply, send rest back to frontend
  98. empty, reply = request[3:]
  99. frontend.send_multipart([client, b'', reply])
  100. elif client.decode('utf-8') == 'STOP':
  101. self.destroy()
  102. break
  103.  
  104. if frontend in sockets:
  105. # Get next client request, route to last-used worker
  106. client, empty, request = frontend.recv_multipart()
  107.  
  108. worker = workers.pop(0)
  109. backend.send_multipart([worker, b'', client, b'', request])
  110.  
  111. if not workers:
  112. # Don't poll clients if no workers are available
  113. poller.unregister(frontend)
  114.  
  115. def destroy(self):
  116. self.backend.close()
  117. self.frontend.close()
  118. self.ctx.term()
  119.  
  120. for process in self.processes:
  121. process.terminate()
  122.  
  123.  
  124. def main():
  125. srv = Server()
  126. srv.start(NBR_WORKERS)
  127.  
  128.  
  129. if __name__ == '__main__':
  130. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement