Advertisement
mironovichandrei

Untitled

Mar 1st, 2020
467
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.76 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. # импортируем библиотеки
  5. import sys
  6. import getopt
  7. from datetime import datetime
  8. import pandas as pd
  9. from sqlalchemy import create_engine
  10.  
  11. if __name__ == "__main__":
  12.  
  13. #Задаём входные параметры
  14. unixOptions = "sdt:edt"
  15. gnuOptions = ["start_dt=", "end_dt="]
  16.  
  17. fullCmdArguments = sys.argv
  18. argumentList = fullCmdArguments[1:] #excluding script name
  19.  
  20. try:
  21. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  22. except getopt.error as err:
  23. print (str(err))
  24. sys.exit(2)
  25.  
  26. start_dt = ''
  27. end_dt = ''
  28. for currentArgument, currentValue in arguments:
  29. if currentArgument in ("-sdt", "--start_dt"):
  30. start_dt = currentValue
  31. elif currentArgument in ("-edt", "--end_dt"):
  32. end_dt = currentValue
  33.  
  34. # Задаём параметры подключения к БД,
  35. # их можно узнать у администратора БД.
  36. db_config = {'user': 'my_user',
  37. 'pwd': '123456',
  38. 'host': 'localhost',
  39. 'port': 5432,
  40. 'db': 'zen'}
  41.  
  42. # Формируем строку соединения с БД.
  43. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  44. db_config['pwd'],
  45. db_config['host'],
  46. db_config['port'],
  47. db_config['db'])
  48. # Подключаемся к БД.
  49. engine = create_engine(connection_string)
  50.  
  51. # Формируем sql-запрос.
  52. query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type, source_id, source_topic, source_type,
  53. TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt, user_id
  54. FROM log_raw
  55. WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  56. '''.format(start_dt, end_dt)
  57.  
  58. # записываем данные в датафрейм
  59. raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  60.  
  61. # округляем до минут
  62. raw['dt'] = pd.to_datetime(raw['dt']).dt.round('min')
  63.  
  64. # создаем агрегирующие таблицы
  65. dash_engagement = (raw
  66. .groupby(['dt','item_topic', 'event', 'age_segment'])
  67. .agg({'user_id':'nunique'})
  68. )
  69.  
  70. dash_visits = (raw
  71. .groupby(['item_topic', 'source_topic', 'age_segment', 'dt'])
  72. .agg({'user_id':'count'})
  73. )
  74.  
  75. #переименовка колонок для БД
  76. dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
  77. dash_visits = dash_visits.rename(columns = {'user_id': 'visits'})
  78.  
  79. #заполнение пропусков и сброс индекса
  80. dash_engagement = dash_engagement.fillna(0).reset_index()
  81. dash_visits = dash_visits.fillna(0).reset_index()
  82.  
  83. # удаление ранее записанных данных
  84. tables = {'dash_engagement':dash_engagement,
  85. 'dash_visits':dash_visits}
  86.  
  87. for table_name, table_data in tables.items():
  88. query = '''
  89. DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP
  90. AND '{}'::TIMESTAMP
  91. '''.format(table_name, start_dt, end_dt)
  92. engine.execute(query)
  93.  
  94. table_data.to_sql(name=table_name, con=engine, if_exists='append', index=False)
  95. print('Good!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement