Advertisement
Guest User

Untitled

a guest
Jul 1st, 2016
54
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.29 KB | None | 0 0
  1. from __future__ import print_function
  2. import uuid
  3. import time
  4. import logging
  5. import json
  6. from six.moves import xrange
  7. import zmq
  8. from main.worker import Worker
  9. from multiprocessing import Process, Event
  10.  
  11. from meru.model import DBSession
  12. from meru.model import Number
  13. from meru.model import engine
  14. import transaction
  15. import logging
  16. FORMAT = "%(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s"
  17. logging.basicConfig(format=FORMAT)
  18. LOG = logging.getLogger(__name__)
  19. LOG.setLevel(logging.INFO)
  20.  
  21. class Job(object):
  22.     def __init__(self, work):
  23.         self.id = uuid.uuid4().hex
  24.         self.work = work
  25.  
  26.  
  27. class Router(object):
  28.     """
  29.    Manage distribution of jobs to workers and collation of results.
  30.    """
  31.  
  32.     CONTROL_PORT = 5755
  33.  
  34.     def __init__(self, port=CONTROL_PORT):
  35.         LOG.info('CLASS __init__ router')
  36.         self.logger = logging.getLogger(__name__)
  37. #         self.stop_event = stop_event
  38.         self.context = zmq.Context()
  39.         self.socket = self.context.socket(zmq.ROUTER)
  40.         self.socket.bind('tcp://*:{0}'.format(port))
  41.         self.workers = {}
  42.         # We won't assign more than 50 jobs to a worker at a time; this ensures
  43.         # reasonable memory usage, and less shuffling when a worker dies.
  44.         self.max_jobs_per_worker = 50
  45.         # When/if a client disconnects we'll put any unfinished work in here,
  46.         # work_iterator() will return work from here as well.
  47.         self._work_to_requeue = []
  48.         self._work = [] #[Job({'number': 99})]
  49.  
  50.     def submit_job(self, job):
  51.         self._work.extend(job)
  52.         LOG.info('submit_job self._work: %s', repr(self._work))
  53.  
  54.     def run(self):
  55.         LOG.info('CLASS run router %s _work: %s', repr(self.context), repr(self._work))
  56.         while True:
  57.             next_worker_id = None
  58.             LOG.info('submit_job self._work: %s', repr(self._work))
  59. #             LOG.info('CLASS router running')
  60.             job = None
  61.             if self._work:
  62.                 job = self._work.pop()
  63.             if job:
  64. #                 LOG.info('job: %s', repr(job.number))
  65.                 while next_worker_id is None:
  66.                     # First check if there are any worker messages to process. We
  67.                     # do this while checking for the next available worker so that
  68.                     # if it takes a while to find one we're still processing
  69.                     # incoming messages.
  70.                     while self.socket.poll(0):
  71.                         # Note that we're using recv_multipart() here, this is a
  72.                         # special method on the ROUTER socket that includes the
  73.                         # id of the sender. It doesn't handle the json decoding
  74.                         # automatically though so we have to do that ourselves.
  75.                         worker_id, message = self.socket.recv_multipart()
  76.                         message = json.loads(message.decode('utf8'))
  77.                         self._handle_worker_message(worker_id, message)
  78.                     # If there are no available workers (they all have 50 or
  79.                     # more jobs already) sleep for half a second.
  80.                     next_worker_id = self._get_next_worker_id()
  81.                     if next_worker_id is None:
  82.                         time.sleep(0.5)
  83.                 # We've got a Job and an available worker_id, all we need to do
  84.                 # is send it. Note that we're now using send_multipart(), the
  85.                 # counterpart to recv_multipart(), to tell the ROUTER where our
  86.                 # message goes.
  87.                 self.logger.info('sending job %s to worker %s', job.id,
  88.                                  next_worker_id)
  89.                 self.workers[next_worker_id][job.id] = job
  90.                 self.socket.send_multipart(
  91.                     [next_worker_id, json.dumps((job.id, job.work)).encode('utf8')])
  92.  
  93.  
  94.     def _get_next_worker_id(self):
  95.         """Return the id of the next worker available to process work. Note
  96.        that this will return None if no clients are available.
  97.        """
  98.         # It isn't strictly necessary since we're limiting the amount of work
  99.         # we assign, but just to demonstrate that we're doing our own load
  100.         # balancing we'll find the worker with the least work
  101.         if self.workers:
  102.             worker_id, work = sorted(self.workers.items(),
  103.                                      key=lambda x: len(x[1]))[0]
  104.             if len(work) < self.max_jobs_per_worker:
  105.                 return worker_id
  106.         # No worker is available. Our caller will have to handle this.
  107.         return None
  108.  
  109.     def _handle_worker_message(self, worker_id, message):
  110.         """Handle a message from the worker identified by worker_id.
  111.  
  112.        {'message': 'connect'}
  113.        {'message': 'disconnect'}
  114.        {'message': 'job_done', 'job_id': 'xxx', 'result': 'yyy'}
  115.        """
  116.         if message['message'] == 'connect':
  117.             assert worker_id not in self.workers
  118.             self.workers[worker_id] = {}
  119.             self.logger.info('[%s]: connect', worker_id)
  120.         elif message['message'] == 'disconnect':
  121.             # Remove the worker so no more work gets added, and put any
  122.             # remaining work into _work_to_requeue
  123.             remaining_work = self.workers.pop(worker_id)
  124.             self._work_to_requeue.extend(remaining_work.values())
  125.             self.logger.info('[%s]: disconnect, %s jobs requeued', worker_id,
  126.                              len(remaining_work))
  127.         elif message['message'] == 'job_done':
  128.             result = message['result']
  129.             job = self.workers[worker_id].pop(message['job_id'])
  130.             self._process_results(worker_id, job, result)
  131.         else:
  132.             raise Exception('unknown message: %s' % message['message'])
  133.  
  134.     def _process_results(self, worker_id, job, result):
  135.         self.logger.info('[%s]: finished %s, result: %s',
  136.                          worker_id, job.id, result)
  137.         trans = transaction.begin()
  138.         n = Number(result)
  139.         DBSession.add(n)
  140.         trans.commit()
  141.  
  142.  
  143. def run_worker(event):
  144.     logging.basicConfig(level=logging.INFO)
  145.     worker = Worker(event)
  146.     worker.run()
  147.  
  148.  
  149. def run_router(event):
  150.     logging.basicConfig(level=logging.INFO)
  151.     Router(event).run()
  152.  
  153.  
  154. def run():
  155.     stop_event = Event()
  156.     processes = []
  157.     processes.append(Process(target=run_router, args=(stop_event,)))
  158.     # Start a few worker processes
  159.     for i in range(10):
  160.         processes.append(Process(target=run_worker, args=(stop_event,)))
  161.     # To test out our disconnect messaging we'll also start one more worker
  162.     # process with a different event that we'll stop shortly after starting.
  163.     another_stop_event = Event()
  164.     processes.append(Process(target=run_worker, args=(another_stop_event,)))
  165.     for p in processes:
  166.         p.start()
  167.     try:
  168.         time.sleep(5)
  169.         another_stop_event.set()
  170.         # The router will set the stop event when it's finished, just
  171.         # idle until then
  172.         while not stop_event.is_set():
  173.             time.sleep(1)
  174.     except KeyboardInterrupt:
  175.         stop_event.set()
  176.         another_stop_event.set()
  177.     print('waiting for processes to die...')
  178.     for p in processes:
  179.         p.join()
  180.     print('all done')
  181.  
  182.  
  183. if __name__ == '__main__':
  184.     run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement