Advertisement
andrey_vetrov

zen_pipeline

Nov 15th, 2019
230
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.40 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. #import dash
  4. #import dash_core_components as dcc
  5. #import dash_html_components as html
  6. #import plotly.graph_objs as go
  7. import sys
  8. import getopt
  9. from datetime import datetime
  10. import pandas as pd
  11. from sqlalchemy import create_engine
  12.  
  13. if __name__ == "__main__":
  14.  
  15.     #Задаем входные параметры
  16.     unixOptions = "s:e"
  17.     gnuOptions = ["start_dt=", "end_dt="]
  18.        
  19.     fullCmdArguments = sys.argv
  20.     argumentList = fullCmdArguments[1:] #excluding script name
  21.        
  22.     try:
  23.         arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  24.     except getopt.error as err:
  25.         print (str(err))
  26.         sys.exit(2)
  27.            
  28.     start_dt = ''
  29.     end_dt = ''
  30.     for currentArgument, currentValue in arguments:
  31.         if currentArgument in ("-s", "--start_dt"):
  32.             start_dt = currentValue
  33.         elif currentArgument in ("-e", "--end_dt"):
  34.             end_dt = currentValue
  35.    
  36.     # Задаем параметры подключения к БД,
  37.     # их можно узнать у администратора БД
  38.     db_config = {'user': 'my_user',        
  39.                 'pwd': 'my_user_password',      
  40.                 'host': 'localhost',      
  41.                 'port': 5432,              
  42.                 'db': 'zen'}            
  43.    
  44.     # Формируем строку соединения с БД.
  45.     connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  46.                                      db_config['pwd'],
  47.                                  db_config['host'],
  48.                                  db_config['port'],
  49.                                  db_config['db'])
  50.     # Подключаемся к БД.
  51.     engine = create_engine(connection_string)
  52.    
  53.     # Теперь выберем из таблицы только те строки,
  54.     # которые были выпущены между start_dt и end_dt
  55.     query = ''' SELECT *,
  56.        TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt
  57.        FROM log_raw
  58.        WHERE (TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC')::TIMESTAMP>='{}'::TIMESTAMP
  59.        AND (TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC')::TIMESTAMP<='{}'::TIMESTAMP
  60.        '''.format(start_dt, end_dt)
  61.     log_raw = pd.io.sql.read_sql(query, con = engine)
  62.    
  63.     columns_datetime = ['dt']
  64.     for column in columns_datetime: log_raw[column] = pd.to_datetime(log_raw[column]).dt.round('min')
  65.     #print(log_raw.head())
  66.    
  67.     dash_visits = log_raw.groupby(['item_topic', 'source_topic','age_segment','dt']).agg({'user_id': 'count'}).reset_index()
  68.     dash_engagement = log_raw.groupby(['dt','item_topic','event','age_segment']).agg({'user_id' : 'nunique'}).reset_index()
  69.     dash_visits = dash_visits.rename(columns = {'user_id': 'visits'})
  70.     dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
  71.    
  72.     tables = {'dash_visits':dash_visits, 'dash_engagement':dash_engagement}
  73.     for table_name, table_data in tables.items():
  74.        query = '''
  75.                 DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  76.               '''.format(table_name, start_dt, end_dt)
  77.        engine.execute(query)
  78.        table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
  79.        
  80.     print(dash_visits.head())
  81.     print()
  82.     print(dash_engagement.head())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement