Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import print_function
- import base64
- import json
- import pymysql
- RDS_HOST = 'host'
- DB_USER = 'dummy_user'
- DB_PASSWORD = 'password1234'
- DB_NAME = 'crazy_name'
- DB_TABLE = 'wow_table'
- class MYSQL(object):
- '''
- This a wrapper Class for PyMySQL
- '''
- CONNECTION_TIMEOUT = 30
- def __init__(self, host, user, password, database, table):
- self.host = host
- self.user = user
- self.password = password
- self.database = database
- self.table = table
- self.connection = self.connect()
- def connect(self):
- '''
- Connects to MySQL instance
- '''
- try:
- connection = pymysql.connect(
- host=self.host,
- user=self.user,
- password=self.password,
- db=self.database,
- connect_timeout=self.CONNECTION_TIMEOUT
- )
- return connection
- except Exception as ex:
- print(ex)
- print("ERROR: Unexpected error: Could not connect to AuroraDB instance")
- def execute(self, account_id, external_ref_id, timestamp):
- '''
- Executes command given a MySQL connection
- '''
- with self.connection.cursor() as cursor:
- sql = ('INSERT INTO ' +
- self.database +
- '.' +
- self.table +
- '(`account_id`, `external_reference_id`, `registration`, `c_name`, `c_id`, `create_date`)' +
- ' VALUES (%s, %s, DATE_FORMAT(STR_TO_DATE(%s,"%%Y-%%M-%%d %%H:%%i:%%s"),"%%Y-%%m-%%d %%H:%%i:%%s"), %s, %s, current_timestamp())' +
- ' ON DUPLICATE KEY UPDATE create_date = VALUES(create_date)')
- cursor.execute(sql, (
- account_id,
- external_ref_id,
- timestamp,
- 'bingo',
- 300)
- )
- self.connection.commit()
- def close_connection(self):
- '''
- Closes connection to MySQL
- '''
- self.connection.close()
- def get_data_from_kinesis_object(obj):
- '''
- Retrieves data from kinesis event
- '''
- return obj['kinesis']['data']
- def decode_data(data):
- '''
- Decodes record via base64
- '''
- return base64.b64decode(data)
- def split_records_into_record(records):
- '''
- Splits a record of records into an array of records
- '''
- return records.split('n')
- def parse_record(record):
- '''
- parses record into JSON
- '''
- if record:
- return json.loads(record)
- def is_record_valid(record):
- '''
- Check for keys in event
- returns True if they all exist
- and False if they dont all exist
- '''
- return all(key in record for key in (
- 'eventName',
- 'sourceType',
- 'AccountId',
- 'Timestamp',
- 'ExternalReferenceId'
- ))
- def handler(event, context):
- """
- This function inserts data into Aurora RDS instance
- """
- mysql = MYSQL(RDS_HOST, DB_USER, DB_PASSWORD, DB_NAME, DB_TABLE)
- for obj in event['Records']:
- records = decode_data(get_data_from_kinesis_object(obj))
- split_records = split_records_into_record(records)
- for record in split_records:
- parsed_record = parse_record(record)
- if is_record_valid(parsed_record):
- mysql.execute(
- parsed_record['AccountId'],
- parsed_record['ExternalReferenceId'],
- str(parsed_record['Timestamp'])
- )
- mysql.close_connection()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement