Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import division, print_function
- import time
- import os
- import configargparse
- import bz2
- from os import listdir
- from os.path import isfile, join
- import multiprocessing as mp
- def process_day(writing_queue, zip_file, path):
- file_name = zip_file[0][:-4]# assuming the filepath ends with .bz2
- zipfile = bz2.BZ2File(path+file_name) # open the file
- data = zipfile.read() # get the decompressed data
- open(path + '/' +zip_file[0], 'wb').write(data) # write a uncompressed file
- return True
- def write_stats_to_csv(writing_queue, save_path):
- """Write user in writing_queue to csv."""
- while True:
- # wait for result to appear in the queue
- month = writing_queue.get()
- # if got signal 'kill' exit the loop
- if month == 'kill':
- break
- if (month is not None):
- csv_path = os.path.join(save_path, month[0] + '.csv')
- with open(csv_path, 'a') as csv:
- csv.write(month[1])
- csv.close()
- def chunks(l, n):
- for i in range(0, len(l), n):
- yield l[i:i+n]
- #####################################
- # main program #
- #####################################
- parser = configargparse.ArgumentParser(
- description='Generate statistics for OPAL raw dataset.')
- parser.add_argument('--num_threads', type=int, required=True,
- help='Number of threads to be used to create data.')
- parser.add_argument('--data_path', required=True,
- help='Data path where generated csv have to be saved.')
- args = parser.parse_args()
- fileDirectory = args.data_path
- if __name__ == "__main__":
- # Prevent attempt to start a new process before the current process has finished its bootstrapping phase in Windows.
- if os.name == 'nt':
- mp.freeze_support()
- print("Starting...")
- start_time = time.time()
- # set up parallel processing
- manager = mp.Manager()
- writing_queue = manager.Queue()
- jobs = []
- # additional 1 process is for which shouldn't take up much CPU power
- pool = mp.Pool(processes=args.num_threads + 1)
- pool.apply_async(write_stats_to_csv, (writing_queue, args.data_path))
- if os.path.exists(fileDirectory):
- filesName = [f for f in listdir(fileDirectory) if isfile(join(fileDirectory, f))]
- chunks = chunks(filesName, args.num_threads-1)
- chunksList = list(chunks)
- for n in range(len(chunksList)):
- jobs.append(pool.apply_async(
- process_day, (writing_queue, chunksList[n])))
- # clean up parallel processing (close pool, wait for processes to
- # finish, kill writing_queue, wait for queue to be killed)
- pool.close()
- for job in jobs:
- job.get()
- writing_queue.put('kill')
- pool.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement