Advertisement
semen_kamagurov

pipeline

Feb 22nd, 2020
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.81 KB | None | 0 0
  1. # !/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. # импорт необходимых библиотек
  5. import sys
  6. import getopt
  7. from datetime import datetime
  8. import pandas as pd
  9. from sqlalchemy import create_engine
  10. # создаем точку входа в программу
  11. if __name__=="__main__":
  12.     # входные параметры пайплайна
  13.     unixOptions = "s:e"
  14.     gnuOptions = ["start_dt=", "end_dt="]
  15.  
  16.     fullCmdArguments = sys.argv
  17.     argumentList = fullCmdArguments[1:]
  18.    
  19.     try:
  20.         arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  21.     except getopt.error as err:
  22.         print(str(err))
  23.         sys.exit(2)  
  24.  
  25.     start_dt = ''
  26.     end_dt = ''
  27.     # считывание входных параметров для start_dt, end_dt в цикле
  28.     for currentArgument, currentValue in arguments:
  29.         if currentArgument in ('-s', '--start_dt'):
  30.             start_dt = currentValue
  31.         elif currentArgument in ('-e', '--end_dt'):
  32.             end_dt = currentValue
  33.     # задаем параметры подключения к базе данных (БД)
  34.     db_config = {'user':'my_user',
  35.                  'pwd':'my_user_password',
  36.                  'host':'localhost',
  37.                  'port': 5432,
  38.                  'db':'zen'}  
  39.     # задаем строку подключения к БД
  40.     connection_string = 'postgres://{}:{}@{}:{}/{}'.format(db_config['user'],
  41.                                                            db_config['pwd'],
  42.                                                            db_config['host'],
  43.                                                            db_config['port'],
  44.                                                            db_config['db'])
  45.     engine = create_engine(connection_string)
  46.     # создаем SQL-запрос для чтения сырых данных
  47.  
  48.  
  49.     query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type, source_id, source_topic, source_type,
  50.        TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt, user_id
  51.        FROM log_raw
  52.        WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  53.        '''.format(start_dt, end_dt)
  54.     # записываем данные в датафрейм                                                      
  55.     log_raw = pd.io.sql.read_sql(query, con=engine)
  56.     # округляем до минут
  57.     log_raw['dt'] = pd.to_datetime(log_raw['dt']).dt.round('min')
  58.     # создаем агрегирующие таблицы
  59.     dash_engagement = (log_raw
  60.                             .groupby(['dt','item_topic', 'event', 'age_segment'])
  61.                             .agg({'user_id':'nunique'})
  62.                             .reset_index()
  63.                       )
  64.  
  65.     dash_visits = (log_raw
  66.                         .groupby(['item_topic', 'source_topic', 'age_segment', 'dt'])
  67.                         .agg({'user_id':'count'})
  68.                         .reset_index()
  69.                   )
  70.  
  71.     dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
  72.     dash_visits = dash_visits.rename(columns = {'user_id': 'visits'})
  73.    
  74.     # удаление ранее записанных данных
  75.     dash_engagement = dash_engagement.fillna(0)
  76.     dash_visits = dash_visits.fillna(0)
  77.  
  78.     tables = {'dash_engagement':dash_engagement,
  79.               'dash_visits':dash_visits}
  80.  
  81.     for table_name, table_data in tables.items():
  82.         query = '''
  83.                DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP
  84.                AND '{}'::TIMESTAMP
  85.                '''.format(table_name, start_dt, end_dt)
  86.         engine.execute(query)
  87.  
  88.         table_data.to_sql(name=table_name, con=engine, if_exists='append', index=False)
  89.     print('Success!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement