Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from multiprocessing import Process, Manager, Value, Array
- import time
- import humanize
- #actual computation
- def f(a):
- return a-1.0;
- # obs: with such a simple computation, there will advantage will be no computation time advantage.
- # use this for hardcore stuff that deals with peripherals such as a disk read
- #computation wrapper
- def process_target(pidx, Global, res, count, N, P):
- for i in range(pidx, N, P):
- with count.get_lock():
- count.value += 1;
- res[i] = f(Global.mat[i])
- if Global.run is False:
- break;
- if __name__ == '__main__':
- manager = Manager()
- Global = manager.Namespace()
- N = 100000; #job size
- P = 12; #process number
- Global.mat = np.random.rand(N); #input vector
- res = Array('d', N, lock = False) #output vector
- count = Value('i', 0) #counter
- Global.run = True; #used for greceful halt
- #init procecess
- proc_list = [Process(target=process_target, args=(i, Global, res, count, N, P)) for i in range(P)]
- #start procecess
- [p.start() for p in proc_list]
- ts = time.time()
- elapsed = 0;
- c = 0;
- while any([p.is_alive() for p in proc_list]):
- try:
- time.sleep(2)
- elapsed_old = elapsed;
- elapsed = time.time();
- c_old = c
- c = count.value
- remaining = (elapsed - elapsed_old)*(N - c)/(c + -c_old)
- print(humanize.naturaltime(remaining, future=True));
- except KeyboardInterrupt:
- print("KeyboardInterrupt")
- Global.run = False;
- [p.terminate() for p in proc_list]
- break;
- except ZeroDivisionError:
- time.sleep(1)
- [p.join() for p in proc_list]
- print("Final time: " + humanize.naturaldelta(time.time() - ts))
- print("{} out of {} elements were processed".format(count.value, N))
- #get results
- res_npy = np.frombuffer(res, dtype='d');
Add Comment
Please, Sign In to add comment