Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- # импорт необходимых библиотек
- import sys
- import getopt
- from datetime import datetime
- import pandas as pd
- from sqlalchemy import create_engine
- # основная часть исполняемого кода
- if __name__ == "__main__":
- # задание входных параметров
- unixOptions = "s:e"
- gnuOptions = ["start_dt=", "end_dt="]
- # считывание входных параметров в переменную fullCmdArguments
- fullCmdArguments = sys.argv
- # сохранение в argumentList всех параметров, начиная с первого (параметр с индексом 0 - имя самого скрипта)
- argumentList = fullCmdArguments[1:]
- # проверка набора входных параметров на пустоту
- try:
- arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
- except getopt.error as err:
- print (str(err))
- sys.exit(2)
- # перебор входных параметров и занесение их значений во внутренние переменные
- # (при настройке автоматизации запуска скрипта заменить значения start_dt и end_dt на пустые значения '')
- start_dt = '2019-09-24 18:00:00'
- end_dt = '2019-09-24 19:00:00'
- for currentArgument, currentValue in arguments:
- if currentArgument in ("-s", "--start_dt"):
- start_dt = datetime.strptime(currentValue, '%Y-%m-%d %H:%M:%S')
- elif currentArgument in ("-e", "--end_dt"):
- end_dt = datetime.strptime(currentValue, '%Y-%m-%d %H:%M:%S')
- # подключение к базе данных zen
- db_config = {'user': 'my_user',
- 'pwd': 'my_user_password',
- 'host': 'localhost',
- 'port': 5432,
- 'db': 'zen'}
- connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(
- db_config['user'],
- db_config['pwd'],
- db_config['host'],
- db_config['port'],
- db_config['db']
- )
- engine = create_engine(connection_string)
- # формирование SQL-запроса (выборка из таблицы log_raw строк между start_dt и end_dt)
- query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type, source_id, source_topic, source_type, TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt, user_id
- FROM log_raw
- WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
- '''.format(start_dt, end_dt)
- # выполнение запроса и сохранение результата выполнения в DataFrame log_raw
- log_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
- columns_datetime = ['dt']
- for column in columns_datetime: log_raw[column] = pd.to_datetime(log_raw[column]).dt.round('min')
- # формирование агрегирующих датафреймов
- # датафрейм событий (dash_visits)
- dash_visits = log_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt']).agg({'user_id': 'count'})
- dash_visits = dash_visits.rename(columns = {'user_id': 'visits'})
- # датафрейм воронки для пользователей, у которых было значение event, равное 'show'(dash_engagement)
- # создание уникального списка пользователей, у которых был event, равный 'show'
- unique_users_list = log_raw.query('event == "show"')
- unique_users_list = unique_users_list['user_id'].unique().tolist()
- dash_engagement = log_raw.query('user_id in @unique_users_list').groupby(['dt', 'item_topic', 'event', 'age_segment']).agg({'user_id': 'nunique'})
- dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
- # заполнение пустых значений
- dash_visits = dash_visits.fillna(0).reset_index()
- dash_engagement = dash_engagement.fillna(0).reset_index()
- # удаление старых записей между start_dt и end_dt
- tables = {'dash_visits': dash_visits,
- 'dash_engagement': dash_engagement}
- for table_name, table_data in tables.items():
- query = ''' DELETE
- FROM {}
- WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
- '''.format(table_name, start_dt, end_dt)
- engine.execute(query)
- table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
- print('All done!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement