Advertisement
Guest User

Untitled

a guest
May 21st, 2018
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.80 KB | None | 0 0
  1. from __future__ import division, print_function
  2. import time
  3. import os
  4. import configargparse
  5. import bz2
  6. from os import listdir
  7. from os.path import isfile, join
  8. import multiprocessing as mp
  9.  
  10. def process_day(writing_queue, zip_file, path):
  11. file_name = zip_file[0][:-4]# assuming the filepath ends with .bz2
  12. zipfile = bz2.BZ2File(path+file_name) # open the file
  13. data = zipfile.read() # get the decompressed data
  14. open(path + '/' +zip_file[0], 'wb').write(data) # write a uncompressed file
  15. return True
  16.  
  17. def write_stats_to_csv(writing_queue, save_path):
  18. """Write user in writing_queue to csv."""
  19. while True:
  20. # wait for result to appear in the queue
  21. month = writing_queue.get()
  22. # if got signal 'kill' exit the loop
  23. if month == 'kill':
  24. break
  25. if (month is not None):
  26. csv_path = os.path.join(save_path, month[0] + '.csv')
  27. with open(csv_path, 'a') as csv:
  28. csv.write(month[1])
  29. csv.close()
  30.  
  31.  
  32. def chunks(l, n):
  33. for i in range(0, len(l), n):
  34. yield l[i:i+n]
  35.  
  36. #####################################
  37. # main program #
  38. #####################################
  39.  
  40.  
  41. parser = configargparse.ArgumentParser(
  42. description='Generate statistics for OPAL raw dataset.')
  43. parser.add_argument('--num_threads', type=int, required=True,
  44. help='Number of threads to be used to create data.')
  45. parser.add_argument('--data_path', required=True,
  46. help='Data path where generated csv have to be saved.')
  47. args = parser.parse_args()
  48.  
  49. fileDirectory = args.data_path
  50.  
  51.  
  52. if __name__ == "__main__":
  53.  
  54. # Prevent attempt to start a new process before the current process has finished its bootstrapping phase in Windows.
  55. if os.name == 'nt':
  56. mp.freeze_support()
  57.  
  58. print("Starting...")
  59. start_time = time.time()
  60.  
  61. # set up parallel processing
  62. manager = mp.Manager()
  63. writing_queue = manager.Queue()
  64. jobs = []
  65. # additional 1 process is for which shouldn't take up much CPU power
  66. pool = mp.Pool(processes=args.num_threads + 1)
  67. pool.apply_async(write_stats_to_csv, (writing_queue, args.data_path))
  68. if os.path.exists(fileDirectory):
  69. filesName = [f for f in listdir(fileDirectory) if isfile(join(fileDirectory, f))]
  70. chunks = chunks(filesName, args.num_threads-1)
  71. chunksList = list(chunks)
  72. for n in range(len(chunksList)):
  73. jobs.append(pool.apply_async(
  74. process_day, (writing_queue, chunksList[n])))
  75.  
  76. # clean up parallel processing (close pool, wait for processes to
  77. # finish, kill writing_queue, wait for queue to be killed)
  78. pool.close()
  79. for job in jobs:
  80. job.get()
  81. writing_queue.put('kill')
  82. pool.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement