Advertisement
Guest User

Untitled

a guest
Mar 29th, 2020
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.75 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import getopt
  5. from datetime import datetime
  6. import pandas as pd
  7. from sqlalchemy import create_engine
  8.  
  9. if __name__ == "__main__":
  10.  
  11. #Задаём входные параметры
  12. unixOptions = "s:e"
  13. gnuOptions = ["start_dt=", "end_dt="]
  14.  
  15. fullCmdArguments = sys.argv
  16. argumentList = fullCmdArguments[1:] #excluding script name
  17.  
  18. try:
  19. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  20. except getopt.error as err:
  21. print (str(err))
  22. sys.exit(2)
  23.  
  24. start_dt = '2019-09-24 18:00:00+03'
  25. end_dt = '2019-09-24 19:00:00+03'
  26. for currentArgument, currentValue in arguments:
  27. if currentArgument in ("-s", "--start_dt"):
  28. start_dt = currentValue
  29. elif currentArgument in ("-e", "--end_dt"):
  30. end_dt = currentValue
  31.  
  32. db_config = {'user': 'my_user',
  33. 'pwd': 'my_user_password',
  34. 'host': 'localhost',
  35. 'port': 5432,
  36. 'db': 'zen'}
  37.  
  38. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  39. db_config['pwd'],
  40. db_config['host'],
  41. db_config['port'],
  42. db_config['db'])
  43. engine = create_engine(connection_string)
  44.  
  45. # Теперь выберем из таблицы только те строки,
  46. # которые были выпущены между start_dt и end_dt
  47. query = ''' SELECT
  48. event_id
  49. ,age_segment
  50. ,event
  51. ,item_id
  52. ,item_topic
  53. ,item_type
  54. ,source_id
  55. ,source_topic
  56. ,source_type
  57. ,TO_TIMESTAMP(ts/1000) AT TIME ZONE 'Etc/UTC' as dt
  58. ,user_id
  59. FROM log_raw
  60. WHERE TO_TIMESTAMP(ts/1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP;
  61. '''.format(start_dt, end_dt)
  62.  
  63. data_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')
  64. columns_str = ['age_segment', 'item_topic', 'source_topic', 'source_type','event']
  65. columns_numeric = ['item_id', 'source_id', 'user_id']
  66. columns_datetime = ['dt']
  67. for column in columns_str: data_raw[column] = data_raw[column].astype(str)
  68. for column in columns_numeric: data_raw[column] = pd.to_numeric(data_raw[column], errors='coerce')
  69. for column in columns_datetime: data_raw[column] = pd.to_datetime(data_raw[column])
  70.  
  71. dash_visits = data_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt']).agg({'user_id': 'count'})
  72. dash_visits = dash_visits.rename(columns = {'user_id':'visits'})
  73. dash_visits = dash_visits.fillna(0).reset_index()
  74.  
  75.  
  76. #dash_engagement
  77. dash_engagement = data_raw.groupby(['dt', 'item_topic', 'event', 'age_segment']).agg({'user_id':'nunique'})
  78. dash_engagement = dash_engagement.rename(columns = {'user_id':'unique_users'})
  79. dash_engagement = dash_engagement.fillna(0).reset_index()
  80.  
  81.  
  82.  
  83. #Удаляем старые записи между start_dt и end_dt
  84. tables = {'dash_visits': dash_visits, 'dash_engagement': dash_engagement}
  85.  
  86. for table_name, table_data in tables.items():
  87. query = '''
  88. DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  89. '''.format(table_name, start_dt, end_dt)
  90. engine.execute(query)
  91.  
  92. table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)
  93.  
  94. print('All done.')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement