visoft

multiproc_with_numpy_array

Oct 18th, 2017
243
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import numpy as np
  2. import multiprocessing as mp
  3. import multiprocessing.sharedctypes
  4. import ctypes
  5.  
  6. import functools
  7. import time
  8.  
  9. SHAPE = (2000,100000)
  10. STEP = 250
  11.  
  12. def _get_size_from_shape(shape):
  13.     return functools.reduce(lambda x, y: x * y, shape)
  14.  
  15.  
  16. def _eval_dest_array(destination):
  17.     return np.abs(np.sum(destination) - _get_size_from_shape(destination.shape)) < 0.0001
  18.  
  19.  
  20. def create_np_shared_array(shape, dtype, ctype):
  21.     # Feel free to create a map from usual dtypes to ctypes. Or suggest a more elegant way
  22.     size = _get_size_from_shape(shape)
  23.     shared_mem_chunck = mp.sharedctypes.RawArray(ctype, size)
  24.     numpy_array_view = np.frombuffer(shared_mem_chunck, dtype).reshape(shape)
  25.     return numpy_array_view
  26.  
  27.  
  28. def lengthly_operation(source,destination,idx_start,idx_end):
  29.     # Just numpy ops. No .frombuffer or other memory aware operations
  30.     # You can run this with regular numpy arrays as input.
  31.     destination[idx_start:idx_end,:] = np.floor(np.divide(np.power(source[idx_start:idx_end,:],5),1e9)) + 1
  32.  
  33.  
  34. def serial():
  35.     t0 = time.time()
  36.     src = np.random.rand(*SHAPE).astype(np.float32)
  37.     dst = np.zeros(SHAPE,np.float32)
  38.     t1 = time.time() - t0
  39.     t0 = time.time()
  40.     lengthly_operation(src,dst,0,SHAPE[0])
  41.     t2 = time.time() - t0
  42.     tt = t1 + t2
  43.     ok = _eval_dest_array(dst)
  44.     print("Serial version: allocate mem {} exec: {} total: {} Succes: {}".format(t1, t2, tt, ok))
  45.  
  46.  
  47. def parallel_trivial_np():
  48.     t0 = time.time()
  49.     src = np.random.rand(*SHAPE).astype(np.float32)
  50.     # This array will not be updated. The forked child will "point" to different areas for each process.
  51.     dst = np.zeros(SHAPE,np.float32)
  52.     t1 = time.time() - t0
  53.     t0 = time.time()
  54.     processes = []
  55.     for k in range(0, SHAPE[0], STEP):
  56.         p = mp.Process(target=lengthly_operation,args=(src, dst, k, k + STEP))
  57.         processes.append(p)
  58.     t2 = time.time() - t0
  59.     t0 = time.time()
  60.     for p in processes:
  61.         p.start()
  62.     for p in processes:
  63.         p.join()
  64.     t3 = time.time() - t0
  65.     tt = t1 + t2 + t3
  66.     ok = _eval_dest_array(dst)
  67.     print("Parallel with trivial np: allocate mem {} spawn process: {} exec: {} total: {} Succes: {}".format(t1, t2, t3, tt, ok))
  68.  
  69.  
  70. def parallel_shared_np():
  71.     t0 = time.time()
  72.     src = np.random.rand(*SHAPE).astype(np.float32)
  73.  
  74.     src_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float)
  75.     dst_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float)
  76.     t01 = time.time() - t0
  77.     src_shared[:] = src[:] # Some numpy ops accept an 'out' array where to store the results
  78.     t1 = time.time() - t0
  79.     t0 = time.time()
  80.     processes = []
  81.     for k in range(0, SHAPE[0], STEP):
  82.         p = mp.Process(target=lengthly_operation,args=(src_shared, dst_shared, k, k + STEP))
  83.         processes.append(p)
  84.     t2 = time.time() - t0
  85.     t0 = time.time()
  86.     for p in processes:
  87.         p.start()
  88.     for p in processes:
  89.         p.join()
  90.     t3 = time.time() - t0
  91.     tt = t1 + t2 + t3
  92.     ok = _eval_dest_array(dst_shared)
  93.     print("Parallel with shared mem np: allocate mem {} (pure alloc:{} copy: {}) "
  94.           "spawn process: {} exec: {} total: {} Succes: {}".format(t1, t01, t1 - t01, t2, t3, tt, ok))
  95.  
  96.  
  97. if __name__ == "__main__":
  98.     serial()
  99.     parallel_trivial_np()
  100.     parallel_shared_np()
RAW Paste Data