Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import itertools
- import os
- import operator
- import shutil
- import bz2
- import numpy as np
- import pandas as pd
- import multiprocessing as mp
- import argparse
- # initiate argument parser and args
- parser = argparse.ArgumentParser()
- parser.add_argument("-i", "--input", help="specify input filename",
- type=str, required=True)
- parser.add_argument("-o", "--output", help="specify output filename",
- type=str, required=True)
- parser.add_argument("-t", "--type",
- help="specify the format to convert to", type=str,
- required=True, default="wide", choices=["wide", "long"])
- parser.add_argument("-d", "--dir", type=str, default="matrixconv_temp",
- help="temp directory for intermediate files")
- parser.add_argument("-c", "--cores", type=int, default=4,
- help="number of cores to pivot with")
- # save args as vars
- args = parser.parse_args()
- input_fname = args.input
- output_fname = args.output
- mat_type = args.type
- temp_dir = os.path.join(args.dir, "")
- n_cores = args.cores
- # long to wide conversion
- if mat_type == "wide":
- dtypes = {0:np.int, 1:np.int, 2:np.float64}
- col_names = ["origin", "destination", "agg_cost"]
- # Mini function to pivot each origin csv to wide
- def convert_row(file):
- pd.read_csv(file, dtype=dtypes, names=col_names) \
- .pivot(index="origin", columns="destination", values="agg_cost") \
- .reindex(columns=destinations) \
- .to_csv(file)
- # Create a new temporary directory to store intermediate files
- try:
- os.makedirs(temp_dir)
- except FileExistsError:
- pass
- # Extract a csv for each unique origin id
- print("Parsing individual origin files...")
- for key, rows in itertools.groupby(csv.reader(bz2.open(input_fname, "rt")),
- operator.itemgetter(0)):
- with open(temp_dir + "%s.csv" % key, "w") as output:
- for row in rows:
- output.write(",".join(row[0:3]) + "\n")
- # Get a list of all files created minus header file
- if os.path.isfile(temp_dir + "origin.csv"):
- os.remove(temp_dir + "origin.csv")
- files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if
- os.path.isfile(os.path.join(temp_dir, f)) and f != "origin.csv"]
- # Use pandas to load a list of all unique destinations from the input file
- print("Getting list of unique destinations...")
- destinations = pd.read_csv(
- input_fname,
- usecols=["destination"],
- dtype=dtypes,
- squeeze=True).unique()
- # Execute pivoting of all origins in parallel
- print("Converting origin files from long to wide...")
- pool = mp.Pool(n_cores)
- results = pool.map(convert_row, files)
- # Combine all of the pivoted files into one output file
- print("Concatenating pivoted origin files...")
- with open(output_fname, 'wb') as outfile:
- for i, file in enumerate(files):
- with open(file, 'rb') as infile:
- if i != 0:
- infile.readline()
- shutil.copyfileobj(infile, outfile)
- # Delete temp directory and files
- try:
- shutil.rmtree(temp_dir)
- except:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement