Guest User

Untitled

a guest
Mar 17th, 2018
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.27 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. from io import StringIO
  3. import pandas as pd
  4. import traceback
  5. import psycopg2
  6. import boto3
  7. import sys
  8. import os
  9.  
  10.  
  11. def connect_to_redshift(dbname, host, user, port = 5439, **kwargs):
  12. # connect to redshift
  13. global connect, cursor
  14. connect = psycopg2.connect(dbname = dbname,
  15. host = host,
  16. port = port,
  17. user = user,
  18. **kwargs)
  19.  
  20. cursor = connect.cursor()
  21.  
  22.  
  23. def connect_to_s3(aws_access_key_id, aws_secret_access_key, bucket, subdirectory = None, **kwargs):
  24. global s3, s3_bucket_var, s3_subdirectory_var, aws_1, aws_2, aws_token
  25. s3 = boto3.resource('s3',
  26. aws_access_key_id = aws_access_key_id,
  27. aws_secret_access_key = aws_secret_access_key,
  28. **kwargs)
  29. s3_bucket_var = bucket
  30. if subdirectory is None:
  31. s3_subdirectory_var = ''
  32. else:
  33. s3_subdirectory_var = subdirectory + '/'
  34. aws_1 = aws_access_key_id
  35. aws_2 = aws_secret_access_key
  36. if kwargs.get('aws_session_token'):
  37. aws_token = kwargs.get('aws_session_token')
  38. else:
  39. aws_token = ''
  40.  
  41.  
  42. def redshift_to_pandas(sql_query):
  43. # pass a sql query and return a pandas dataframe
  44. cursor.execute(sql_query)
  45. columns_list = [desc[0] for desc in cursor.description]
  46. data = pd.DataFrame(cursor.fetchall(), columns = columns_list)
  47. return data
  48.  
  49.  
  50. def pandas_to_redshift(data_frame,
  51. redshift_table_name,
  52. column_data_types = None,
  53. index = False,
  54. save_local = False,
  55. delimiter = ',',
  56. quotechar = '"',
  57. dateformat = 'auto',
  58. timeformat = 'auto',
  59. region = '',
  60. append = False):
  61. rrwords = open(os.path.join(os.path.dirname(__file__), \
  62. 'redshift_reserve_words.txt'), 'r').readlines()
  63. rrwords = [r.strip().lower() for r in rrwords]
  64. data_frame.columns = [x.lower() for x in data_frame.columns]
  65. not_valid = [r for r in data_frame.columns if r in rrwords]
  66. if not_valid:
  67. raise ValueError('DataFrame column name {0} is a reserve word in redshift'.format(not_valid[0]))
  68. else:
  69. csv_name = redshift_table_name + '.csv'
  70. if save_local == True:
  71. data_frame.to_csv(csv_name, index = index, sep = delimiter)
  72. print('saved file {0} in {1}'.format(csv_name, os.getcwd()))
  73. # SEND DATA TO S3
  74. csv_buffer = StringIO()
  75. data_frame.to_csv(csv_buffer, index = index, sep = delimiter)
  76. s3.Bucket(s3_bucket_var).put_object(Key= s3_subdirectory_var + csv_name, Body = csv_buffer.getvalue())
  77. print('saved file {0} in bucket {1}'.format(csv_name, s3_subdirectory_var + csv_name))
  78. # CREATE AN EMPTY TABLE IN REDSHIFT
  79. if index == True:
  80. columns = list(data_frame.columns)
  81. if data_frame.index.name:
  82. columns.insert(0, data_frame.index.name)
  83. else:
  84. columns.insert(0, "index")
  85. else:
  86. columns = list(data_frame.columns)
  87. if column_data_types is None:
  88. column_data_types = ['varchar(256)'] * len(columns)
  89. columns_and_data_type = ', '.join(['{0} {1}'.format(x, y) for x,y in zip(columns, column_data_types)])
  90. if append is False:
  91. create_table_query = 'create table {0} ({1})'.format(redshift_table_name, columns_and_data_type)
  92. print(create_table_query)
  93. print('CREATING A TABLE IN REDSHIFT')
  94. cursor.execute('drop table if exists {0}'.format(redshift_table_name))
  95. cursor.execute(create_table_query)
  96. connect.commit()
  97. # CREATE THE COPY STATEMENT TO SEND FROM S3 TO THE TABLE IN REDSHIFT
  98. bucket_name = 's3://{0}/{1}'.format(s3_bucket_var, s3_subdirectory_var + csv_name)
  99. s3_to_sql = """
  100. copy {0}
  101. from '{1}'
  102. delimiter '{2}'
  103. ignoreheader 1
  104. csv quote as '{3}'
  105. dateformat '{4}'
  106. timeformat '{5}'
  107. access_key_id '{6}'
  108. secret_access_key '{7}'
  109. """.format(redshift_table_name, bucket_name, delimiter, quotechar, dateformat, timeformat, aws_1, aws_2)
  110. if region:
  111. s3_to_sql = s3_to_sql + "region '{0}'".format(region)
  112. if aws_token != '':
  113. s3_to_sql = s3_to_sql + "\n\tsession_token '{0}'".format(aws_token)
  114. s3_to_sql = s3_to_sql + ';'
  115. print(s3_to_sql)
  116. # send the file
  117. print('FILLING THE TABLE IN REDSHIFT')
  118. try:
  119. cursor.execute(s3_to_sql)
  120. connect.commit()
  121. except Exception as e:
  122. print(e)
  123. traceback.print_exc(file=sys.stdout)
  124. connect.rollback()
  125. raise
  126.  
  127.  
  128.  
  129. def exec_commit(sql_query):
  130. cursor.execute(sql_query)
  131. connect.commit()
  132.  
  133.  
  134. def close_up_shop():
  135. global connect, cursor, s3, s3_bucket_var, s3_subdirectory_var, aws_1, aws_2, aws_token
  136. cursor.close()
  137. connect.commit()
  138. connect.close()
  139. try:
  140. del connect, cursor
  141. except:
  142. pass
  143. try:
  144. del s3, s3_bucket_var, s3_subdirectory_var, aws_1, aws_2, aws_token
  145. except:
  146. pass
Add Comment
Please, Sign In to add comment