Advertisement
amkiller

Untitled

Dec 7th, 2017
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.44 KB | None | 0 0
  1. import multiprocessing
  2.  
  3. import datetime
  4. from random import randint
  5.  
  6. import numpy
  7. import os
  8. import time
  9.  
  10. import pandas as pd
  11.  
  12.  
  13. def printProgressBar(iteration, total, prefix='', suffix='', decimals=1, length=100, fill='X'):
  14.     """
  15.    Call in a loop to create terminal progress bar
  16.    @params:
  17.        iteration   - Required  : current iteration (Int)
  18.        total       - Required  : total iterations (Int)
  19.        prefix      - Optional  : prefix string (Str)
  20.        suffix      - Optional  : suffix string (Str)
  21.        decimals    - Optional  : positive number of decimals in percent complete (Int)
  22.        length      - Optional  : character length of bar (Int)
  23.        fill        - Optional  : bar fill character (Str)
  24.     https://gist.github.com/snakers4/91fa21b9dda9d055a02ecd23f24fbc3d
  25.    """
  26.     percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
  27.     filledLength = int(length * iteration // total)
  28.     bar = fill * filledLength + '-' * (length - filledLength)
  29.     print('\r%s: %s |%s| %s%% %s' % (datetime.datetime.now(), prefix, bar, percent, suffix))
  30.     # Print New Line on Complete
  31.     if iteration == total:
  32.         print()
  33.  
  34. def process(df):
  35.     """Process partitioned data and do something to generate your results"""
  36.  
  37.     # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  38.     # Simulate work, obviously remove this
  39.     # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  40.     time.sleep(randint(0, 2))
  41.  
  42.     result = []
  43.  
  44.     for item in df['IP']:
  45.         result.append("Done processing %s" % item)
  46.  
  47.     return result;
  48.  
  49. def handleOutput(result):
  50.     """Handle results from process(). Uses single-thread to append to result file safely."""
  51.     f = open('results.txt', 'a')
  52.     for item in result:
  53.         f.write(item + '\n')
  54.     f.close()
  55.  
  56. if __name__ == '__main__':
  57.  
  58.     # Cleanup from previous run
  59.     os.remove("results.txt")
  60.  
  61.     # Create a fake workload, you'll probably want to supply your own list
  62.     f = open('ips.csv', 'w')
  63.     f.write('IP\n')
  64.     for i in range(100000):
  65.         f.write(str(i) + '\n')
  66.     f.close()
  67.  
  68.     df = pd.read_csv("ips.csv", header=0)
  69.  
  70.     # Number of threads to use
  71.     threads = 16
  72.     # Partition work so that each thread gets around 100 things to work on at once before it finishes and calls handleOutput()
  73.     partition_size_target = 1000
  74.  
  75.     partitions = int(len(df) / partition_size_target)
  76.     df_split = numpy.array_split(df, partitions)
  77.     task_count = len(df_split)
  78.     pool = multiprocessing.Pool(threads)
  79.     result_objs = []
  80.     for d in df_split:
  81.         result = pool.apply_async(process, (d,), callback=handleOutput)
  82.         result_objs.append(result)
  83.  
  84.     # Initial call to print 0% progress
  85.     previous_incomplete_count = -1
  86.     while True:
  87.         incomplete_count = sum(1 for x in result_objs if not x.ready())
  88.         # This saves memory by removing complete jobs from result_objs.
  89.         # I was processing images so needed to do this as it contained the bitmap data, you may not
  90.         result_objs = [x for x in result_objs if not x.ready()]
  91.  
  92.         if incomplete_count == 0:
  93.             print 'Complete'
  94.             break
  95.  
  96.         if (incomplete_count != previous_incomplete_count):
  97.             printProgressBar(task_count - incomplete_count, task_count, prefix='Progress:', decimals=3, suffix='Complete', length=50)
  98.  
  99.         previous_incomplete_count = incomplete_count
  100.         time.sleep(.5)
  101.  
  102.     pool.close()
  103.     pool.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement