Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import print_function
- import uuid
- import time
- import logging
- import json
- from six.moves import xrange
- import zmq
- from main.worker import Worker
- from multiprocessing import Process, Event
- from meru.model import DBSession
- from meru.model import Number
- from meru.model import engine
- import transaction
- import logging
- FORMAT = "%(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s"
- logging.basicConfig(format=FORMAT)
- LOG = logging.getLogger(__name__)
- LOG.setLevel(logging.INFO)
- class Job(object):
- def __init__(self, work):
- self.id = uuid.uuid4().hex
- self.work = work
- class Router(object):
- """
- Manage distribution of jobs to workers and collation of results.
- """
- CONTROL_PORT = 5755
- def __init__(self, port=CONTROL_PORT):
- LOG.info('CLASS __init__ router')
- self.logger = logging.getLogger(__name__)
- # self.stop_event = stop_event
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.ROUTER)
- self.socket.bind('tcp://*:{0}'.format(port))
- self.workers = {}
- # We won't assign more than 50 jobs to a worker at a time; this ensures
- # reasonable memory usage, and less shuffling when a worker dies.
- self.max_jobs_per_worker = 50
- # When/if a client disconnects we'll put any unfinished work in here,
- # work_iterator() will return work from here as well.
- self._work_to_requeue = []
- self._work = [] #[Job({'number': 99})]
- def submit_job(self, job):
- self._work.extend(job)
- LOG.info('submit_job self._work: %s', repr(self._work))
- def run(self):
- LOG.info('CLASS run router %s _work: %s', repr(self.context), repr(self._work))
- while True:
- next_worker_id = None
- LOG.info('submit_job self._work: %s', repr(self._work))
- # LOG.info('CLASS router running')
- job = None
- if self._work:
- job = self._work.pop()
- if job:
- # LOG.info('job: %s', repr(job.number))
- while next_worker_id is None:
- # First check if there are any worker messages to process. We
- # do this while checking for the next available worker so that
- # if it takes a while to find one we're still processing
- # incoming messages.
- while self.socket.poll(0):
- # Note that we're using recv_multipart() here, this is a
- # special method on the ROUTER socket that includes the
- # id of the sender. It doesn't handle the json decoding
- # automatically though so we have to do that ourselves.
- worker_id, message = self.socket.recv_multipart()
- message = json.loads(message.decode('utf8'))
- self._handle_worker_message(worker_id, message)
- # If there are no available workers (they all have 50 or
- # more jobs already) sleep for half a second.
- next_worker_id = self._get_next_worker_id()
- if next_worker_id is None:
- time.sleep(0.5)
- # We've got a Job and an available worker_id, all we need to do
- # is send it. Note that we're now using send_multipart(), the
- # counterpart to recv_multipart(), to tell the ROUTER where our
- # message goes.
- self.logger.info('sending job %s to worker %s', job.id,
- next_worker_id)
- self.workers[next_worker_id][job.id] = job
- self.socket.send_multipart(
- [next_worker_id, json.dumps((job.id, job.work)).encode('utf8')])
- def _get_next_worker_id(self):
- """Return the id of the next worker available to process work. Note
- that this will return None if no clients are available.
- """
- # It isn't strictly necessary since we're limiting the amount of work
- # we assign, but just to demonstrate that we're doing our own load
- # balancing we'll find the worker with the least work
- if self.workers:
- worker_id, work = sorted(self.workers.items(),
- key=lambda x: len(x[1]))[0]
- if len(work) < self.max_jobs_per_worker:
- return worker_id
- # No worker is available. Our caller will have to handle this.
- return None
- def _handle_worker_message(self, worker_id, message):
- """Handle a message from the worker identified by worker_id.
- {'message': 'connect'}
- {'message': 'disconnect'}
- {'message': 'job_done', 'job_id': 'xxx', 'result': 'yyy'}
- """
- if message['message'] == 'connect':
- assert worker_id not in self.workers
- self.workers[worker_id] = {}
- self.logger.info('[%s]: connect', worker_id)
- elif message['message'] == 'disconnect':
- # Remove the worker so no more work gets added, and put any
- # remaining work into _work_to_requeue
- remaining_work = self.workers.pop(worker_id)
- self._work_to_requeue.extend(remaining_work.values())
- self.logger.info('[%s]: disconnect, %s jobs requeued', worker_id,
- len(remaining_work))
- elif message['message'] == 'job_done':
- result = message['result']
- job = self.workers[worker_id].pop(message['job_id'])
- self._process_results(worker_id, job, result)
- else:
- raise Exception('unknown message: %s' % message['message'])
- def _process_results(self, worker_id, job, result):
- self.logger.info('[%s]: finished %s, result: %s',
- worker_id, job.id, result)
- trans = transaction.begin()
- n = Number(result)
- DBSession.add(n)
- trans.commit()
- def run_worker(event):
- logging.basicConfig(level=logging.INFO)
- worker = Worker(event)
- worker.run()
- def run_router(event):
- logging.basicConfig(level=logging.INFO)
- Router(event).run()
- def run():
- stop_event = Event()
- processes = []
- processes.append(Process(target=run_router, args=(stop_event,)))
- # Start a few worker processes
- for i in range(10):
- processes.append(Process(target=run_worker, args=(stop_event,)))
- # To test out our disconnect messaging we'll also start one more worker
- # process with a different event that we'll stop shortly after starting.
- another_stop_event = Event()
- processes.append(Process(target=run_worker, args=(another_stop_event,)))
- for p in processes:
- p.start()
- try:
- time.sleep(5)
- another_stop_event.set()
- # The router will set the stop event when it's finished, just
- # idle until then
- while not stop_event.is_set():
- time.sleep(1)
- except KeyboardInterrupt:
- stop_event.set()
- another_stop_event.set()
- print('waiting for processes to die...')
- for p in processes:
- p.join()
- print('all done')
- if __name__ == '__main__':
- run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement