Advertisement
Guest User

Untitled

a guest
Jan 26th, 2020
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.87 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. # импорт необходимых библиотек
  5. import sys
  6. import getopt
  7. from datetime import datetime
  8. import pandas as pd
  9. from sqlalchemy import create_engine
  10.  
  11. # основная часть исполняемого кода
  12. if __name__ == "__main__":
  13.  
  14. # задание входных параметров
  15. unixOptions = "s:e"
  16. gnuOptions = ["start_dt=", "end_dt="]
  17.  
  18. # считывание входных параметров в переменную fullCmdArguments
  19. fullCmdArguments = sys.argv
  20. # сохранение в argumentList всех параметров, начиная с первого (параметр с индексом 0 - имя самого скрипта)
  21. argumentList = fullCmdArguments[1:]
  22.  
  23. # проверка набора входных параметров на пустоту
  24. try:
  25. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  26. except getopt.error as err:
  27. print (str(err))
  28. sys.exit(2)
  29.  
  30. # перебор входных параметров и занесение их значений во внутренние переменные
  31. # (при настройке автоматизации запуска скрипта заменить значения start_dt и end_dt на пустые значения '')
  32. start_dt = '2019-09-24 18:00:00'
  33. end_dt = '2019-09-24 19:00:00'
  34. for currentArgument, currentValue in arguments:
  35. if currentArgument in ("-s", "--start_dt"):
  36. start_dt = datetime.strptime(currentValue, '%Y-%m-%d %H:%M:%S')
  37. elif currentArgument in ("-e", "--end_dt"):
  38. end_dt = datetime.strptime(currentValue, '%Y-%m-%d %H:%M:%S')
  39.  
  40. # подключение к базе данных zen
  41. db_config = {'user': 'my_user',
  42. 'pwd': 'my_user_password',
  43. 'host': 'localhost',
  44. 'port': 5432,
  45. 'db': 'zen'}
  46.  
  47. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(
  48. db_config['user'],
  49. db_config['pwd'],
  50. db_config['host'],
  51. db_config['port'],
  52. db_config['db']
  53. )
  54. engine = create_engine(connection_string)
  55.  
  56. # формирование SQL-запроса (выборка из таблицы log_raw строк между start_dt и end_dt)
  57. query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type, source_id, source_topic, source_type, TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt, user_id
  58. FROM log_raw
  59. WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  60. '''.format(start_dt, end_dt)
  61.  
  62. # выполнение запроса и сохранение результата выполнения в DataFrame log_raw
  63. log_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  64.  
  65. columns_datetime = ['dt']
  66. for column in columns_datetime: log_raw[column] = pd.to_datetime(log_raw[column]).dt.round('min')
  67.  
  68. # формирование агрегирующих датафреймов
  69. # датафрейм событий (dash_visits)
  70. dash_visits = log_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt']).agg({'user_id': 'count'})
  71. dash_visits = dash_visits.rename(columns = {'user_id': 'visits'})
  72.  
  73. # датафрейм воронки для пользователей, у которых было значение event, равное 'show'(dash_engagement)
  74. # создание уникального списка пользователей, у которых был event, равный 'show'
  75. unique_users_list = log_raw.query('event == "show"')
  76. unique_users_list = unique_users_list['user_id'].unique().tolist()
  77.  
  78. dash_engagement = log_raw.query('user_id in @unique_users_list').groupby(['dt', 'item_topic', 'event', 'age_segment']).agg({'user_id': 'nunique'})
  79. dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
  80.  
  81. # заполнение пустых значений
  82. dash_visits = dash_visits.fillna(0).reset_index()
  83. dash_engagement = dash_engagement.fillna(0).reset_index()
  84.  
  85. # удаление старых записей между start_dt и end_dt
  86. tables = {'dash_visits': dash_visits,
  87. 'dash_engagement': dash_engagement}
  88.  
  89. for table_name, table_data in tables.items():
  90. query = ''' DELETE
  91. FROM {}
  92. WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  93. '''.format(table_name, start_dt, end_dt)
  94. engine.execute(query)
  95.  
  96. table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
  97.  
  98. print('All done!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement