Advertisement
Guest User

Untitled

a guest
Feb 25th, 2020
146
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.93 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import getopt
  5. from datetime import datetime
  6. import pandas as pd
  7. from sqlalchemy import create_engine
  8.  
  9. if __name__ == "__main__":
  10.  
  11. #Задаем входные параметры
  12. unixOptions = "s:e"
  13. gnuOptions = ["start_dt=", "end_dt="]
  14.  
  15. fullCmdArguments = sys.argv
  16. argumentList = fullCmdArguments[1:]
  17.  
  18. try:
  19. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  20. except getopt.error as err:
  21. print (str(err))
  22. sys.exit(2)
  23.  
  24. start_dt = '2019-09-24 18:00:00'
  25. end_dt = '2019-09-25 19:00:00'
  26. for currentArgument, currentValue in arguments:
  27. if currentArgument in ("-s", "--start_dt"):
  28. start_dt = currentValue
  29. elif currentArgument in ("-e", "--end_dt"):
  30. end_dt = currentValue
  31.  
  32.  
  33. db_config = {'user': 'my_user',
  34. 'pwd': 'my_user_password',
  35. 'host': 'localhost',
  36. 'port': 5432,
  37. 'db': 'zen'}
  38.  
  39. # Формируем строку соединения с БД.
  40. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  41. db_config['pwd'],
  42. db_config['host'],
  43. db_config['port'],
  44. db_config['db'])
  45. # Подключаемся к БД.
  46. engine = create_engine(connection_string)
  47.  
  48. # Формируем sql-запрос.
  49. query = ''' SELECT event_id, age_segment, event, item_id, item_topic, item_type,source_id, source_topic, source_type, TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' as dt, user_id
  50. FROM log_raw
  51. WHERE TO_TIMESTAMP(ts / 1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  52. '''.format(start_dt, end_dt)
  53.  
  54.  
  55. log_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  56. log_raw['dt'] = pd.to_datetime(log_raw['dt']).dt.round('min')
  57. dash_visits = log_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt']).agg({'user_id':'count'})
  58. dash_visits = dash_visits.rename(columns = {'user_id':'visits'})
  59. dash_visits = dash_visits.fillna(0).reset_index()
  60. dash_engagement = log_raw.groupby(['dt', 'item_topic', 'event', 'age_segment']).agg({'user_id':'nunique'})
  61. dash_engagement = dash_engagement.rename(columns = {'user_id':'unique_users'})
  62. dash_engagement = dash_engagement.fillna(0).reset_index()
  63. tables = {'dash_visits': dash_visits,'dash_engagement': dash_engagement}
  64. for sql_table, df in tables.items():
  65. query = '''DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP'''.format(sql_table, start_dt, end_dt)
  66. engine.execute(query)
  67. df.to_sql(name = sql_table, con = engine, if_exists = 'append', index = False)
  68. print('awesome!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement