Advertisement
Higem

Untitled

Mar 20th, 2020
138
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.72 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4.  
  5. import getopt
  6. from datetime import datetime
  7.  
  8. import pandas as pd
  9.  
  10. from sqlalchemy import create_engine
  11.  
  12. if __name__ == "__main__":
  13.  
  14. #input params setup
  15. unixOptions = "s:e"
  16. gnuOptions = ["start_dt=", "end_dt="]
  17.  
  18. fullCmdArguments = sys.argv
  19. argumentList = fullCmdArguments[1☺ #excluding script name
  20.  
  21. try:
  22. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  23. except getopt.error as err:
  24. # output error, and return with an error code
  25. print (str(err))
  26. sys.exit(2)
  27.  
  28. start_dt = ''
  29. end_dt = ''
  30. for currentArgument, currentValue in arguments:
  31. if currentArgument in ("-s", "--start_dt"):
  32. start_dt = currentValue
  33. elif currentArgument in ("-e", "--end_dt"):
  34. end_dt = currentValue
  35.  
  36. db_config = {'user': 'my_user',
  37. 'pwd': 'my_user_password',
  38. 'host': 'localhost',
  39. 'port': 5432,
  40. 'db': 'zen'}
  41. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  42. db_config['pwd'],
  43. db_config['host'],
  44. db_config['port'],
  45. db_config['db'])
  46.  
  47. engine = create_engine(connection_string)
  48.  
  49. query = ''' SELECT *,
  50. TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'ETC/UTC' as dt
  51. FROM log_raw
  52. WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'ETC/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  53. '''.format(start_dt, end_dt)
  54.  
  55. log_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  56.  
  57. columns_str = ['age_segment', 'event', 'item_topic', 'item_type',
  58. 'source_topic', 'source_type', 'user_id']
  59. columns_numeric = ['item_id', 'source_id']
  60. columns_datetime = ['ts', 'dt']
  61.  
  62. for column in columns_str: log_raw[column] = log_raw[column].astype(str)
  63. for column in columns_numeric: log_raw[column] = pd.to_numeric(log_raw[column], errors='coerce')
  64. for column in columns_datetime: log_raw[column] = pd.to_datetime(log_raw[column])
  65.  
  66. dash_visits = log_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt']).agg({'event': 'count'})
  67. dash_visits = dash_visits.rename(columns={'event': 'visits'})
  68. print('dash_visits', dash_visits.head())
  69.  
  70. dash_engagement = log_raw.groupby(['dt', 'item_topic', 'event', 'age_segment']).agg({'user_id': 'nunique'})
  71. dash_engagement = dash_engagement.rename(columns={'user_id': 'unique_users'})
  72. print('dash_engagement', dash_engagement.head())
  73.  
  74.  
  75. dash_visits = dash_visits.fillna(0).reset_index()
  76. dash_engagement = dash_engagement.fillna(0).reset_index()
  77.  
  78. tables = {'dash_visits': dash_visits,
  79. 'dash_engagement': dash_engagement}
  80. for table_name, table_data in tables.items():
  81. query = '''
  82. DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  83. '''.format(table_name, start_dt, end_dt)
  84. engine.execute(query)
  85. table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
  86.  
  87. print('All done.')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement