Advertisement
Guest User

Untitled

a guest
Oct 14th, 2016
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.54 KB | None | 0 0
  1. from __future__ import print_function
  2. import base64
  3. import json
  4. import pymysql
  5.  
  6. RDS_HOST = 'host'
  7.  
  8. DB_USER = 'dummy_user'
  9. DB_PASSWORD = 'password1234'
  10. DB_NAME = 'crazy_name'
  11. DB_TABLE = 'wow_table'
  12.  
  13. class MYSQL(object):
  14. '''
  15. This a wrapper Class for PyMySQL
  16. '''
  17. CONNECTION_TIMEOUT = 30
  18.  
  19. def __init__(self, host, user, password, database, table):
  20. self.host = host
  21. self.user = user
  22. self.password = password
  23. self.database = database
  24. self.table = table
  25. self.connection = self.connect()
  26.  
  27. def connect(self):
  28. '''
  29. Connects to MySQL instance
  30. '''
  31. try:
  32. connection = pymysql.connect(
  33. host=self.host,
  34. user=self.user,
  35. password=self.password,
  36. db=self.database,
  37. connect_timeout=self.CONNECTION_TIMEOUT
  38. )
  39.  
  40. return connection
  41.  
  42. except Exception as ex:
  43. print(ex)
  44. print("ERROR: Unexpected error: Could not connect to AuroraDB instance")
  45.  
  46. def execute(self, account_id, external_ref_id, timestamp):
  47. '''
  48. Executes command given a MySQL connection
  49. '''
  50.  
  51. with self.connection.cursor() as cursor:
  52. sql = ('INSERT INTO ' +
  53. self.database +
  54. '.' +
  55. self.table +
  56. '(`account_id`, `external_reference_id`, `registration`, `c_name`, `c_id`, `create_date`)' +
  57. ' VALUES (%s, %s, DATE_FORMAT(STR_TO_DATE(%s,"%%Y-%%M-%%d %%H:%%i:%%s"),"%%Y-%%m-%%d %%H:%%i:%%s"), %s, %s, current_timestamp())' +
  58. ' ON DUPLICATE KEY UPDATE create_date = VALUES(create_date)')
  59. cursor.execute(sql, (
  60. account_id,
  61. external_ref_id,
  62. timestamp,
  63. 'bingo',
  64. 300)
  65. )
  66.  
  67. self.connection.commit()
  68.  
  69. def close_connection(self):
  70. '''
  71. Closes connection to MySQL
  72. '''
  73. self.connection.close()
  74.  
  75. def get_data_from_kinesis_object(obj):
  76. '''
  77. Retrieves data from kinesis event
  78. '''
  79. return obj['kinesis']['data']
  80.  
  81. def decode_data(data):
  82. '''
  83. Decodes record via base64
  84. '''
  85. return base64.b64decode(data)
  86.  
  87. def split_records_into_record(records):
  88. '''
  89. Splits a record of records into an array of records
  90. '''
  91. return records.split('n')
  92.  
  93. def parse_record(record):
  94. '''
  95. parses record into JSON
  96. '''
  97.  
  98. if record:
  99.  
  100. return json.loads(record)
  101.  
  102. def is_record_valid(record):
  103. '''
  104. Check for keys in event
  105. returns True if they all exist
  106. and False if they dont all exist
  107.  
  108. '''
  109. return all(key in record for key in (
  110. 'eventName',
  111. 'sourceType',
  112. 'AccountId',
  113. 'Timestamp',
  114. 'ExternalReferenceId'
  115. ))
  116.  
  117. def handler(event, context):
  118. """
  119. This function inserts data into Aurora RDS instance
  120. """
  121.  
  122. mysql = MYSQL(RDS_HOST, DB_USER, DB_PASSWORD, DB_NAME, DB_TABLE)
  123.  
  124. for obj in event['Records']:
  125. records = decode_data(get_data_from_kinesis_object(obj))
  126. split_records = split_records_into_record(records)
  127.  
  128. for record in split_records:
  129. parsed_record = parse_record(record)
  130.  
  131. if is_record_valid(parsed_record):
  132. mysql.execute(
  133. parsed_record['AccountId'],
  134. parsed_record['ExternalReferenceId'],
  135. str(parsed_record['Timestamp'])
  136. )
  137.  
  138. mysql.close_connection()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement