Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import pandas as pd
- import numpy as np
- from tqdm import tqdm
- import concurrent.futures as cf
- from itertools import zip_longest
- GITHUB = os.environ.get("GITHUB")
- facilities = pd.read_csv('../data/raw/facilities.csv')
- accounts = pd.read_csv('../data/raw/accounts.csv')
- users = pd.read_csv('../data/raw/users.csv')
- # rename ParentId in facitilies to FacilityId
- facilities = facilities.rename(columns={'ParentId': 'FacilityId'})
- # rename Id in users to CreatedById (to make merging more clean)
- users = users.rename(columns={'Id': 'CreatedById'})
- # merge the user data and create a Boolean column to represent if the field edit was a mass update or not
- # (mass updates are generated by Anthony Zirilli, Steven Han, or Developer, so I identified their user Ids)
- def merge_user_data(df):
- df = df.merge(users, on='CreatedById', how='left')
- df['MassUpdated'] = np.where(df['CreatedById'].isin(['005E0000007XSBHIA4', '005E0000007OrqlIAC', '005E0000007QaBkIAK']), True, False)
- return df
- facilities = merge_user_data(facilities)
- accounts = merge_user_data(accounts)
- # Convert date to a timestamp
- facilities['CreatedDate'] = pd.to_datetime(facilities['CreatedDate'])
- accounts['CreatedDate'] = pd.to_datetime(accounts['CreatedDate'])
- # create a Boolean column to represent if the field edit was a "true change":
- # if OldValue at first timestamp = NewValue at last timestamp then False, else True
- # partition by FacilityId, Field order by CreatedDate
- facilities['EditOrder'] = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].rank(method='first')
- accounts['EditOrder'] = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].rank(method='first')
- facilities_uniqs = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].count().reset_index()
- facilities_uniqs = facilities_uniqs[facilities_uniqs['CreatedDate'] > 1]
- accounts_uniqs = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].count().reset_index()
- accounts_uniqs = accounts_uniqs[accounts_uniqs['CreatedDate'] > 1]
- def get_truechange(Id, Field, table):
- # returns single boolean value (True or False) representing whether or not the Field for the given Id (facility or account id) has truly changed from the original value
- # input strings for the Id, Field, and table ('facilities' or 'accounts')
- if table == 'facilities':
- idCol = 'FacilityId'
- df = facilities
- else:
- idCol = 'AccountId'
- df = accounts
- df_sub = df[(df[idCol] == Id) & (df['Field'] == Field)]
- orig_val = df_sub[df_sub['EditOrder'] == 1]['OldValue'].values[0]
- latest_val = df_sub[df_sub['EditOrder'] == df_sub['EditOrder'].max()]['NewValue'].values[0]
- return (orig_val != latest_val)
- get_true_change = np.vectorize(get_truechange)
- #facilities['TrueChange'] = get_true_change(facilities_uniqs['FacilityId'].values, facilities_uniqs['Field'].values, 'facilities')
- def tqdm_parallel_map(executor, fn, *iterables, **kwargs):
- """
- Equivalent to executor.map(fn, *iterables),
- but displays a tqdm-based progress bar.
- Does not support timeout or chunksize as executor.submit is used internally
- **kwargs is passed to tqdm.
- """
- futures_list = []
- args = iterables[0]
- final_args = [dict(zip(args.keys(), values)) for values in zip_longest(*args.values())]
- for arg in final_args:
- futures_list += [executor.submit(fn, arg['Id'], arg['Field'], arg['table'])]
- for f in tqdm(cf.as_completed(futures_list), total=len(futures_list), **kwargs):
- pass
- return futures_list
- print("Running facilities...")
- facilities_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': facilities_uniqs['FacilityId'].values, 'Field': facilities_uniqs['Field'].values, 'table': ['facilities'] * len(facilities_uniqs['Field'].values)})
- facilities_final = facilities.merge(facilities_uniqs, on=['FacilityId','Field'], how='left')
- print(facilities_final.head())
- facilities_final.to_csv('../data/interim/facilities.csv',index=False)
- print("Running accounts...")
- accounts_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': accounts_uniqs['AccountId'].values, 'Field': accounts_uniqs['Field'].values, 'table': ['accounts'] * len(accounts_uniqs['Field'].values)})
- accounts_final = accounts.merge(accounts_uniqs, on=['AccountId','Field'], how='left')
- accounts_final.to_csv('../data/interim/accounts.csv',index=False)
Add Comment
Please, Sign In to add comment