Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- '''
- ItemProcessor
- Written by: John Fink
- Takes a list of objects, and a function, and some optional arguments to
- that function. Runs that function (with those optional arguments) against
- each element of the input list, in parallel. Results from each function
- call can be retrieved in a list of tuples (along with the input) with
- .result(), which will also optionally (by default) join and block until
- the entire list is finished.
- Sample usage:
- tasks = [1,2,3,4,5,6]
- def test(num,val):
- return num > val
- print itemprocessor.ItemProcessor(tasks).start(test,3).result()
- '[(1, False), (2, False), (3, False), (4, True), (5, True), (6, True)]'
- '''
- import multiprocessing
- class ItemProcessor(object):
- def __init__(self,task_list,process_count=10):
- self.m=multiprocessing.Manager()
- self.tasks=self.m.Queue()
- self.results=self.m.list()
- for task in task_list:
- self.tasks.put(task)
- self.process_count=process_count
- def join(self):
- return self.tasks.join()
- def _worker_process(self,*args,**kwargs):
- func=kwargs['func']
- del kwargs['func']
- while True:
- try:
- task=self.tasks.get_nowait()
- except:
- return
- self.results.append((task,func(task,*args,**kwargs)))
- self.tasks.task_done()
- def start(self,func,*args,**kwargs):
- for i in xrange(0,self.process_count):
- kwargs['func']=func
- thread=multiprocessing.Process(target=self._worker_process,args=args,kwargs=kwargs)
- thread.start()
- return self
- def result(self,join=True):
- if join:
- self.join()
- return list(self.results)
Add Comment
Please, Sign In to add comment