Advertisement
Guest User

Untitled

a guest
Mar 30th, 2020
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.68 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import getopt
  5. from datetime import datetime
  6. import pandas as pd
  7. from sqlalchemy import create_engine
  8.  
  9. if __name__ == "__main__":
  10.  
  11. #Задаем входные параметры
  12. unixOptions = "sdt:edt"
  13. gnuOptions = ["start_dt=", "end_dt="]
  14.  
  15. fullCmdArguments = sys.argv
  16. argumentList = fullCmdArguments[1:] #excluding script name
  17.  
  18. try:
  19. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  20. except getopt.error as err:
  21. # output error, and return with an error code
  22. print (str(err))
  23. sys.exit(2)
  24.  
  25. start_dt = '2019-09-24 18:00:00'
  26. end_dt = '2019-09-24 19:00:00'
  27. for currentArgument, currentValue in arguments:
  28. if currentArgument in ("-sdt", "--start_dt"):
  29. start_dt = currentValue
  30. elif currentArgument in ("-edt", "--end_dt"):
  31. end_dt = currentValue
  32.  
  33. db_config = {'user': 'my_user',
  34. 'pwd': 'my_user_password',
  35. 'host': 'localhost',
  36. 'port': 5432,
  37. 'db': 'zen'}
  38.  
  39. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  40. db_config['pwd'],
  41. db_config['host'],
  42. db_config['port'],
  43. db_config['db'])
  44. engine = create_engine(connection_string)
  45.  
  46. #Теперь выберем из таблицы только те строки,
  47. #которые были выпущены между start_dt и end_dt
  48. query = ''' SELECT * FROM log_raw
  49. WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC'
  50. BETWEEN
  51. '{}' AND '{}'
  52. '''.format(datetime.strptime(start_dt,'%Y-%m-%d %H:%M:%S'),datetime.strptime(end_dt,'%Y-%m-%d %H:%M:%S'))
  53.  
  54. log_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'game_id')
  55.  
  56. columns_numeric = ['event_id', 'item_id', 'source_id', 'user_id']
  57. columns_datetime = ['ts']
  58. columns_str = ['age_segment', 'event', 'item_topic', 'item_type', 'source_topic', 'source_type']
  59.  
  60. for column in columns_str: log_raw[column] = log_raw[column].astype(str)
  61. for column in columns_numeric: log_raw[column] = pd.to_numeric(log_raw[column], errors='coerce')
  62. for column in columns_datetime: log_raw[column] = pd.to_datetime(log_raw[column]).dt.round('min')
  63.  
  64.  
  65. dash_visits = log_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'timestamp']).agg({'user_id': 'count'})
  66. dash_visits = dash_visits.rename(columns = {'timestamp': 'dt',
  67. 'user_id': 'visits'})
  68. dash_engagement = log_raw.groupby(['timestamp', 'item_topic', 'event', 'age_segment']).agg({'user_id': 'nunique'})
  69. dash_engagement = dash_engagement.rename(columns = {'timestamp': 'dt',
  70. 'user_id': 'unique_users'})
  71.  
  72. #Удаляем старые записи между start_dt и end_dt
  73. tables = {'dash_visits': dash_visits,
  74. 'dash_engagement': dash_engagement}
  75.  
  76. for table_name, table_data in tables.items():
  77.  
  78. query = '''
  79. DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  80. '''.format(table_name, start_dt, end_dt)
  81. engine.execute(query)
  82.  
  83. table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement