Advertisement
mironovichandrei

pipeline_script

Feb 17th, 2020
213
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.18 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import getopt
  5. from datetime import datetime
  6. import pandas as pd
  7. from sqlalchemy import create_engine
  8.  
  9. if __name__ == "__main__":
  10.  
  11. #Задаём входные параметры
  12. unixOptions = "sdt:edt"
  13. gnuOptions = ["start_dt=", "end_dt="]
  14.  
  15. fullCmdArguments = sys.argv
  16. argumentList = fullCmdArguments[1:] #excluding script name
  17.  
  18. try:
  19. arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
  20. except getopt.error as err:
  21. print (str(err))
  22. sys.exit(2)
  23.  
  24. start_dt = ''
  25. end_dt = ''
  26. for currentArgument, currentValue in arguments:
  27. if currentArgument in ("-sdt", "--start_dt"):
  28. start_dt = currentValue
  29. elif currentArgument in ("-edt", "--end_dt"):
  30. end_dt = currentValue
  31.  
  32. db_config = {'user': 'my_user',
  33. 'pwd': '123456',
  34. 'host': 'localhost',
  35. 'port': 5432,
  36. 'db': 'games'}
  37.  
  38. connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'],
  39. db_config['pwd'],
  40. db_config['host'],
  41. db_config['port'],
  42. db_config['db'])
  43. engine = create_engine(connection_string)
  44.  
  45. # Теперь выберем из таблицы только те строки,
  46. # которые были выпущены между start_dt и end_dt
  47. query = ''' SELECT *
  48. FROM data_raw
  49. WHERE year_of_release::TIMESTAMP BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  50. '''.format(start_dt, end_dt)
  51.  
  52. data_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'game_id')
  53.  
  54. columns_numeric = ['na_players', 'eu_players', 'jp_players', 'other_players',
  55. 'critic_score', 'user_score']
  56. columns_datetime = ['year_of_release']
  57. for column in columns_numeric: data_raw[column] = pd.to_numeric(data_raw[column], errors='coerce')
  58. for column in columns_datetime: data_raw[column] = pd.to_datetime(data_raw[column])
  59.  
  60. data_raw['total_copies_sold'] = data_raw[['na_players',
  61. 'eu_players',
  62. 'jp_players',
  63. 'other_players']].sum(axis = 1)
  64.  
  65. agg_games_year = data_raw.groupby('year_of_release').agg({'critic_score': 'mean',
  66. 'user_score': 'mean',
  67. 'total_copies_sold': 'sum'})
  68.  
  69. agg_games_year = agg_games_year.rename(columns = {'critic_score': 'avg_critic_score',
  70. 'user_score': 'avg_user_score'})
  71.  
  72. #Удаляем старые записи между start_dt и end_dt
  73. query = '''DELETE FROM agg_games_year
  74. WHERE year_of_release BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
  75. '''.format(start_dt, end_dt)
  76. engine.execute(query)
  77.  
  78. agg_games_year.to_sql(name = 'agg_games_year', con = engine, if_exists = 'append', index = False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement