Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- # импортируем библиотеки
- import sys
- import getopt
- from datetime import datetime
- import pandas as pd
- from sqlalchemy import create_engine
- if __name__ == "__main__":
- #Задаём входные параметры
- unixOptions = "sdt:edt"
- gnuOptions = ["start_dt=", "end_dt="]
- fullCmdArguments = sys.argv
- argumentList = fullCmdArguments[1:] #excluding script name
- try:
- arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
- except getopt.error as err:
- print (str(err))
- sys.exit(2)
- start_dt = ''
- end_dt = ''
- for currentArgument, currentValue in arguments:
- if currentArgument in ("-sdt", "--start_dt"):
- start_dt = currentValue
- elif currentArgument in ("-edt", "--end_dt"):
- end_dt = currentValue
- # Задаём параметры подключения к БД,
- # их можно узнать у администратора БД.
- db_config = {'user': 'my_user',
- 'pwd': '123456',
- 'host': 'localhost',
- 'port': 5432,
- 'db': 'games'}
- # Формируем строку соединения с БД.
- connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
- db_config['pwd'],
- db_config['host'],
- db_config['port'],
- db_config['db'])
- # Подключаемся к БД.
- engine = create_engine(connection_string)
- # Формируем sql-запрос.
- query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type, source_id, source_topic, source_type,
- TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt, user_id
- FROM log_raw
- WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
- '''.format(start_dt, end_dt)
- # записываем данные в датафрейм
- raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
- # округляем до минут
- raw['dt'] = pd.to_datetime(raw['dt']).dt.round('min')
- # создаем агрегирующие таблицы
- dash_engagement = (raw
- .groupby(['dt','item_topic', 'event', 'age_segment'])
- .agg({'user_id':'nunique'})
- .reset_index()
- )
- dash_visits = (raw
- .groupby(['item_topic', 'source_topic', 'age_segment', 'dt'])
- .agg({'user_id':'count'})
- .reset_index()
- )
- # удаление ранее записанных данных
- dash_engagement = dash_engagement.fillna(0)
- dash_visits = dash_visits.fillna(0)
- tables = {'dash_engagement':dash_engagement,
- 'dash_visits':dash_visits}
- for table_name, table_data in tables.items():
- query = '''
- DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP
- AND '{}'::TIMESTAMP
- '''.format(table_name, start_dt, end_dt)
- engine.execute(query)
- table_data.to_sql(name=table_name, con=engine, if_exists='append', index=False)
- print('Good!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement