SHARE
TWEET

Untitled

a guest Feb 24th, 2020 67 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import sys
  5. import getopt
  6. from datetime import datetime
  7. import pandas as pd
  8. from sqlalchemy import create_engine
  9.  
  10. if __name__ == "__main__":
  11.  
  12.     #Задаем входные параметры
  13.     unixOptions = "s:e"
  14.     gnuOptions = ["start_dt=", "end_dt="]
  15.    
  16.     fullCmdArguments = sys.argv
  17.     argumentList = fullCmdArguments[1:]
  18.    
  19.     try:
  20.         arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  21.     except getopt.error as err:
  22.         print (str(err))
  23.         sys.exit(2)
  24.    
  25.     start_dt = ''
  26.     end_dt = ''
  27.     for currentArgument, currentValue in arguments:
  28.         if currentArgument in ("-s", "--start_dt"):
  29.             start_dt = currentValue
  30.         elif currentArgument in ("-e", "--end_dt"):
  31.             end_dt = currentValue
  32.    
  33.     db_config = {'user': 'my_user',        
  34.                  'pwd': 'my_user_password',
  35.                  'host': 'localhost',      
  36.                  'port': 5432,              
  37.                  'db': 'zen'}            
  38.    
  39.     connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  40.                                     db_config['pwd'],
  41.                                     db_config['host'],
  42.                                     db_config['port'],
  43.                                     db_config['db'])
  44.     engine = create_engine(connection_string)
  45.    
  46.     #Теперь выберем из таблицы только те строки,
  47.     #которые были выпущены между start_dt и end_dt
  48.     query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type, source_id, source_topic, source_type, user_id, TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt
  49.            FROM log_raw
  50.            WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  51.            '''.format(start_dt, end_dt)
  52.    
  53.     data_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  54.  
  55.     # преобразуем типы данных      
  56.     columns_str = ['age_segment', 'event', 'item_topic', 'item_type', 'source_topic', 'source_type']
  57.     columns_numeric = ['item_id', 'source_id', 'user_id ']
  58.     columns_datetime = ['dt']
  59.    
  60.     for column in columns_str: data_raw[column] = data_raw[column].astype(str)  
  61.     for column in columns_numeric: data_raw[column] = pd.to_numeric(data_raw[column], errors='coerce')
  62.     for column in columns_datetime: data_raw[column] = pd.to_datetime(data_raw[column]).dt.round('min')
  63.    
  64.     # создадим агрегирующие таблицы  
  65.     dash_engagement = (data_raw
  66.                        .groupby(['dt', 'item_topic', 'event', 'age_segment'])
  67.                        .agg({'user_id': lambda x: x.nunique()}))
  68.     dash_engagement = dash_engagement.rename(columns = {'user_id': 'unique_users'})
  69.    
  70.     dash_visits = (data_raw
  71.                    .groupby(['item_topic', 'source_topic', 'age_segment', 'dt'])
  72.                    .agg({'event': 'count'}))
  73.     dash_visits = dash_visits.rename(columns = {'event': 'visits'})
  74.    
  75.     dash_visits = dash_visits.fillna(0).reset_index()
  76.     dash_engagement = dash_engagement.fillna(0).reset_index()  
  77.    
  78.     tables = {'dash_visits': dash_visits,
  79.               'dash_engagement': dash_engagement}
  80.  
  81.     for table_name, table_data in tables.items():  
  82.  
  83.         query = '''
  84.                   DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  85.                 '''.format(table_name, start_dt, end_dt)
  86.         engine.execute(query)
  87.  
  88.         table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Top