- if True:
- from multiprocessing import Process as Worker, Queue
- else:
- from threading import Thread as Worker
- from Queue import Queue
- from time import sleep
- class Memoize(object):
- "Cache results of a function."
- def __init__(self, f):
- self.f = f
- self.dict = {}
- def __call__(self, x):
- if x not in self.dict:
- self.dict[x] = self.f(x)
- return self.dict[x]
- def func(x):
- print "ENTERED: func(", x, ")"
- return x**2
- func = Memoize(func)
- def pmap(f, xs):
- """Map variant:
- >>> def f(x):
- ... sleep(1)
- ... return 10 * x
- >>> xs = range(1, 10)
- >>> pmap(f, xs)
- [10, 20, 30, 40, 50, 60, 70, 80, 90]
- This would sleep 9 times as long:
- # >>> map(f, xs)
- # [10, 20, 30, 40, 50, 60, 70, 80, 90]
- """
- # will return this as a result:
- results = [None] * len(xs)
- def work(q, i, x):
- # compute the result and post it together with identifier:
- q.put((i, f(x)))
- # here the workers will post results:
- q = Queue()
- # create thread/process objects, stay conservative pack everything
- # explicitly into args:
- workers = [ Worker(target=work, args=(q, i, x)) for i, x in enumerate(xs) ]
- # start all workers:
- for w in workers:
- w.start()
- # wait for completion:
- for w in workers:
- w.join()
- # another result must be ready, collect them:
- i, fxi = q.get()
- results[i] = fxi
- # now that "results" must have been set:
- return results
- print "Call func(2) four times:"
- print func(2)
- print func(2)
- print func(2)
- print func(2)
- print "Map func() over four 2's:"
- print map(func, [2, 2, 2, 2])
- print "Map func() over four 3's:"
- print map(func, [3, 3, 3, 3])
- print "ParMap func() over four 2's:"
- print pmap(func, [2, 2, 2, 2])
- print "ParMap func() over four 3's:"
- print pmap(func, [3, 3, 3, 3])
- print "ParMap func() over four 4's:"
- print pmap(func, [4, 4, 4, 4])
- # python pmap.py [-v]:
- # if __name__ == "__main__":
- # import doctest
- # doctest.testmod()
- # Default options for vim:sw=4:expandtab:smarttab:autoindent:syntax