Guest User

Untitled

a guest
Sep 1st, 2018
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.64 KB | None | 0 0
  1. import urllib
  2. import pg8000
  3. import boto3
  4. import os
  5. import logging
  6.  
  7. IAM_ROLE = os.environ['IAM_ROLE']
  8. DB_NAME = os.environ['DB_NAME']
  9. DB_USER = os.environ['DB_USER']
  10. DB_PORT = os.environ['DB_PORT']
  11. DB_HOST = os.environ['DB_HOST']
  12. DB_TABLE = os.environ['DB_TABLE']
  13. DB_PW_PARAM = os.environ['DB_PW_PARAM']
  14.  
  15.  
  16. logger = logging.getLogger()
  17. logger.setLevel(logging.INFO)
  18.  
  19.  
  20. def _get_db_password(with_decryption: bool) -> str:
  21. """ Get password from AWS SSM parameter Store """
  22. ssm = boto3.client('ssm')
  23. res = ssm.get_parameter(Name=DB_PW_PARAM, WithDecryption=with_decryption)
  24. if res and res.get('Parameter', {}).get('Value'):
  25. return res['Parameter']['Value']
  26. raise Exception("Failed to retrieve DB password: {}".format(res))
  27.  
  28.  
  29. def _get_pg_client(auto_commit=True, ssl=True):
  30. db_password = _get_db_password(with_decryption=True)
  31. client = pg8000.connect(user=DB_USER, host=DB_HOST, port=int(DB_PORT),
  32. database=DB_NAME, password=db_password, ssl=ssl)
  33. client.autocommit = auto_commit
  34. cur = client.cursor()
  35. return client, cur
  36.  
  37.  
  38. def _get_base_copy_cmd(table: str, bucket: str, key: str) -> str:
  39. """ Base of any Redshift Copy Command """
  40. return """
  41. COPY {table}
  42. FROM 's3://{bucket}/{key}'
  43. IAM_ROLE '{iam_role}'
  44. """.format(table=table, bucket=bucket,
  45. key=key, iam_role=IAM_ROLE)
  46.  
  47.  
  48. def _get_csv_copy_cmd(table: str, bucket: str, key: str) -> str:
  49. """ Example for import CSV files """
  50. base_qry = _get_base_copy_cmd(table=table, bucket=bucket, key=key)
  51. return "{base_query} delimiter '{delimiter}' IGNOREHEADER 1".format(
  52. base_query=base_qry, delimiter=',')
  53.  
  54.  
  55. def build_copy_command(bucket: str, key: str) -> str:
  56. """ Extracts copy command params based on key """
  57. key_split = key.split("/")
  58. table = _get_table(key_split)
  59. qry = _get_csv_copy_cmd(table=table, bucket=bucket, key=key)
  60. logger.info(qry)
  61. return qry
  62.  
  63.  
  64. def handler(event, context):
  65. records = event.get('Records')
  66. s3_data = record.get('s3', {})
  67. bucket = s3_data.get('bucket', {}).get('name')
  68. key = s3_data.get('object', {}).get('key')
  69. key = urllib.parse.unquote_plus(key, encoding='utf-8')
  70.  
  71. logger.info("Loading the following S3 object into Redshift: '{}/{}'".format(bucket, key))
  72. query = build_copy_command(bucket=bucket, key=key)
  73.  
  74. conn, cur = _get_pg_client()
  75. exc = None
  76. try:
  77. cur.execute(query)
  78. except Exception as e:
  79. logger.error("Copy command to Redshift failed while dealing with key: {}/{}".format(bucket, key))
  80. exc = e
  81.  
  82. cur.close()
  83. conn.close()
  84. if exc:
  85. raise exc
Add Comment
Please, Sign In to add comment