Advertisement
Guest User

Untitled

a guest
Feb 25th, 2020
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.55 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import sys
  5. import getopt
  6. from datetime import datetime
  7. import pandas as pd
  8. from sqlalchemy import create_engine
  9.  
  10. if __name__ == "__main__":
  11.  
  12. #Задаём входные параметры
  13. unixOptions = "s:e"
  14. gnuOptions = ["start_dt=", "end_dt="]
  15.  
  16. fullCmdArguments = sys.argv
  17. argumentList = fullCmdArguments[1:] #excluding script name
  18.  
  19. try:
  20. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  21. except getopt.error as err:
  22. print (str(err))
  23. sys.exit(2)
  24.  
  25. start_dt = ''
  26. end_dt = ''
  27. for currentArgument, currentValue in arguments:
  28. if currentArgument in ("-s", "--start_dt"):
  29. start_dt = currentValue
  30. elif currentArgument in ("-e", "--end_dt"):
  31. end_dt = currentValue
  32.  
  33. # Задаем параметры подключения к БД,
  34. # их можно узнать у администратора БД.
  35. db_config = {'user': 'my_user',
  36. 'pwd': 'my_user_password',
  37. 'host': 'localhost',
  38. 'port': 5432,
  39. 'db': 'zen'}
  40.  
  41. # Формируем строку соединения с БД.
  42. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  43. db_config['pwd'],
  44. db_config['host'],
  45. db_config['port'],
  46. db_config['db'])
  47. # Подключаемся к БД.
  48. engine = create_engine(connection_string)
  49.  
  50. # Теперь выберем из таблицы только те строки,
  51. # которые были выпущены между start_dt и end_dt
  52. query = ''' SELECT *, TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' AS dt
  53. FROM log_raw
  54. WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  55. '''.format(start_dt, end_dt)
  56.  
  57. data_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  58.  
  59. data_raw['dt'] = pd.to_datetime(data_raw['dt']).dt.round('min')
  60.  
  61. columns_datetime = ['dt']
  62. columns_numeric = ['user_id']
  63.  
  64. for column in columns_datetime: data_raw[column] = pd.to_datetime(data_raw[column])
  65. for column in columns_numeric: data_raw[column] = pd.to_numeric(data_raw[column], errors='coerce')
  66.  
  67. #### агрегирующие таблицы
  68.  
  69. dash_visits = data_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt']).agg({'event': 'count'}).reset_index()
  70.  
  71. dash_visits = dash_visits.rename(columns = {'event': 'visits'})
  72.  
  73. dash_engagement = data_raw.groupby(['dt', 'item_topic', 'event', 'age_segment']).agg({'user_id': lambda x: x.nunique()}).reset_index()
  74.  
  75. dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
  76.  
  77.  
  78.  
  79. #Удаляем старые записи между start_dt и end_dt
  80. tables = {'dash_visits': dash_visits,
  81. 'dash_engagement': dash_engagement}
  82.  
  83. for table_name, table_data in tables.items():
  84. query = '''DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP'''.format(table_name, start_dt, end_dt)
  85. engine.execute(query)
  86.  
  87. table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
  88.  
  89. print(table_data.head(5))
  90. print()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement