Guest User

Untitled

a guest
Feb 16th, 2019
287
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.34 KB | None | 0 0
  1. import os
  2. import pandas as pd
  3. import numpy as np
  4. from tqdm import tqdm
  5. import concurrent.futures as cf
  6. from itertools import zip_longest
  7.  
  8. GITHUB = os.environ.get("GITHUB")
  9.  
  10. facilities = pd.read_csv('../data/raw/facilities.csv')
  11. accounts = pd.read_csv('../data/raw/accounts.csv')
  12. users = pd.read_csv('../data/raw/users.csv')
  13.  
  14. # rename ParentId in facitilies to FacilityId
  15. facilities = facilities.rename(columns={'ParentId': 'FacilityId'})
  16. # rename Id in users to CreatedById (to make merging more clean)
  17. users = users.rename(columns={'Id': 'CreatedById'})
  18.  
  19. # merge the user data and create a Boolean column to represent if the field edit was a mass update or not
  20. # (mass updates are generated by Anthony Zirilli, Steven Han, or Developer, so I identified their user Ids)
  21.  
  22.  
  23. def merge_user_data(df):
  24. df = df.merge(users, on='CreatedById', how='left')
  25. df['MassUpdated'] = np.where(df['CreatedById'].isin(['005E0000007XSBHIA4', '005E0000007OrqlIAC', '005E0000007QaBkIAK']), True, False)
  26. return df
  27.  
  28.  
  29. facilities = merge_user_data(facilities)
  30. accounts = merge_user_data(accounts)
  31.  
  32. # Convert date to a timestamp
  33. facilities['CreatedDate'] = pd.to_datetime(facilities['CreatedDate'])
  34. accounts['CreatedDate'] = pd.to_datetime(accounts['CreatedDate'])
  35.  
  36. # create a Boolean column to represent if the field edit was a "true change":
  37. # if OldValue at first timestamp = NewValue at last timestamp then False, else True
  38. # partition by FacilityId, Field order by CreatedDate
  39.  
  40. facilities['EditOrder'] = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].rank(method='first')
  41. accounts['EditOrder'] = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].rank(method='first')
  42.  
  43. facilities_uniqs = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].count().reset_index()
  44. facilities_uniqs = facilities_uniqs[facilities_uniqs['CreatedDate'] > 1]
  45. accounts_uniqs = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].count().reset_index()
  46. accounts_uniqs = accounts_uniqs[accounts_uniqs['CreatedDate'] > 1]
  47.  
  48.  
  49. def get_truechange(Id, Field, table):
  50. # returns single boolean value (True or False) representing whether or not the Field for the given Id (facility or account id) has truly changed from the original value
  51. # input strings for the Id, Field, and table ('facilities' or 'accounts')
  52.  
  53. if table == 'facilities':
  54. idCol = 'FacilityId'
  55. df = facilities
  56. else:
  57. idCol = 'AccountId'
  58. df = accounts
  59.  
  60. df_sub = df[(df[idCol] == Id) & (df['Field'] == Field)]
  61.  
  62. orig_val = df_sub[df_sub['EditOrder'] == 1]['OldValue'].values[0]
  63. latest_val = df_sub[df_sub['EditOrder'] == df_sub['EditOrder'].max()]['NewValue'].values[0]
  64.  
  65. return (orig_val != latest_val)
  66.  
  67.  
  68. get_true_change = np.vectorize(get_truechange)
  69.  
  70. #facilities['TrueChange'] = get_true_change(facilities_uniqs['FacilityId'].values, facilities_uniqs['Field'].values, 'facilities')
  71.  
  72.  
  73. def tqdm_parallel_map(executor, fn, *iterables, **kwargs):
  74. """
  75. Equivalent to executor.map(fn, *iterables),
  76. but displays a tqdm-based progress bar.
  77. Does not support timeout or chunksize as executor.submit is used internally
  78. **kwargs is passed to tqdm.
  79. """
  80. futures_list = []
  81. args = iterables[0]
  82.  
  83. final_args = [dict(zip(args.keys(), values)) for values in zip_longest(*args.values())]
  84.  
  85. for arg in final_args:
  86. futures_list += [executor.submit(fn, arg['Id'], arg['Field'], arg['table'])]
  87.  
  88. for f in tqdm(cf.as_completed(futures_list), total=len(futures_list), **kwargs):
  89. pass
  90.  
  91. return futures_list
  92.  
  93.  
  94. print("Running facilities...")
  95. facilities_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': facilities_uniqs['FacilityId'].values, 'Field': facilities_uniqs['Field'].values, 'table': ['facilities'] * len(facilities_uniqs['Field'].values)})
  96. facilities_final = facilities.merge(facilities_uniqs, on=['FacilityId','Field'], how='left')
  97. print(facilities_final.head())
  98. facilities_final.to_csv('../data/interim/facilities.csv',index=False)
  99. print("Running accounts...")
  100. accounts_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': accounts_uniqs['AccountId'].values, 'Field': accounts_uniqs['Field'].values, 'table': ['accounts'] * len(accounts_uniqs['Field'].values)})
  101. accounts_final = accounts.merge(accounts_uniqs, on=['AccountId','Field'], how='left')
  102. accounts_final.to_csv('../data/interim/accounts.csv',index=False)
Add Comment
Please, Sign In to add comment