Guest User

Untitled

a guest
Sep 25th, 2018
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.68 KB | None | 0 0
  1. '''
  2.  
  3. ItemProcessor
  4.  
  5. Written by: John Fink
  6.  
  7. Takes a list of objects, and a function, and some optional arguments to
  8. that function. Runs that function (with those optional arguments) against
  9. each element of the input list, in parallel. Results from each function
  10. call can be retrieved in a list of tuples (along with the input) with
  11. .result(), which will also optionally (by default) join and block until
  12. the entire list is finished.
  13.  
  14. Sample usage:
  15.  
  16. tasks = [1,2,3,4,5,6]
  17. def test(num,val):
  18. return num > val
  19.  
  20. print itemprocessor.ItemProcessor(tasks).start(test,3).result()
  21. '[(1, False), (2, False), (3, False), (4, True), (5, True), (6, True)]'
  22.  
  23. '''
  24.  
  25. import multiprocessing
  26.  
  27. class ItemProcessor(object):
  28. def __init__(self,task_list,process_count=10):
  29. self.m=multiprocessing.Manager()
  30. self.tasks=self.m.Queue()
  31. self.results=self.m.list()
  32. for task in task_list:
  33. self.tasks.put(task)
  34. self.process_count=process_count
  35. def join(self):
  36. return self.tasks.join()
  37. def _worker_process(self,*args,**kwargs):
  38. func=kwargs['func']
  39. del kwargs['func']
  40. while True:
  41. try:
  42. task=self.tasks.get_nowait()
  43. except:
  44. return
  45. self.results.append((task,func(task,*args,**kwargs)))
  46. self.tasks.task_done()
  47. def start(self,func,*args,**kwargs):
  48. for i in xrange(0,self.process_count):
  49. kwargs['func']=func
  50. thread=multiprocessing.Process(target=self._worker_process,args=args,kwargs=kwargs)
  51. thread.start()
  52. return self
  53. def result(self,join=True):
  54. if join:
  55. self.join()
  56. return list(self.results)
Add Comment
Please, Sign In to add comment