Advertisement
rfmonk

multiprocessing_producing_consumer.py

Feb 8th, 2014
108
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.92 KB | None | 0 0
  1. #!/usr/bin/env python
  2.  
  3. # this is from The Python
  4. # Standard Library by example
  5. # ISBN13: 9780321767349
  6.  
  7. import multiprocessing
  8. import time
  9.  
  10.  
  11. class Consumer(multiprocessing.Process):
  12.  
  13.     def __init__(self, task_queue, result_queue):
  14.         multiprocessing.Process.__init__(self)
  15.         self.task_queue = task_queue
  16.         self.result_queue = result_queue
  17.  
  18.     def run(self):
  19.         proc_name = self.name
  20.         while True:
  21.             next_task = self.task_queue.get()
  22.             if next_task is None:
  23.                 # Poison pill means shutdown
  24.                 print '%s: Exiting' % proc_name
  25.                 self.task_queue.task_done()
  26.                 break
  27.             print '%s: %s' % (proc_name, next_task)
  28.             answer = next_task()
  29.             self.task_queue.task_done()
  30.             self.result_queue.put(answer)
  31.         return
  32.  
  33.  
  34. class Task(object):
  35.     def __init__(self, a, b):
  36.         self.a = a
  37.         self.b = b
  38.  
  39.     def __call__(self):
  40.         time.sleep(0.1)  # pretend to take some time to do the work
  41.         return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
  42.  
  43.     def __str__(self):
  44.         return '%s * %s' % (self.a, self.b)
  45.  
  46.  
  47. if __name__ == '__main__':
  48.     # Establish communication queues
  49.     tasks = multiprocessing.JoinableQueue()
  50.     results = multiprocessing.Queue()
  51.  
  52.     # Start consumers
  53.     num_consumers = multiprocessing.cpu_count() * 2
  54.     print 'Creating %d consumers' % num_consumers
  55.     consumers = [Consumer(tasks, results)
  56.                  for i in xrange(num_consumers)]
  57.     for w in consumers:
  58.         w.start()
  59.  
  60.     # Enqueue jobs
  61.     num_jobs = 10
  62.     for i in xrange(num_jobs):
  63.         tasks.put(Task(i, i))
  64.  
  65.     # Add a poison pill for each consumer
  66.     for i in xrange(num_consumers):
  67.         tasks.put(None)
  68.  
  69.     # Wait for all the tasks to finish
  70.     tasks.join()
  71.  
  72.     # Start printing results
  73.     while num_jobs:
  74.         result = results.get()
  75.         print 'Result:', result
  76.         num_jobs -= 1
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement