Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import requests
- import logging
- import base64
- import psycopg2
- import os
- import sys
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- def get_db_creds():
- db_name = os.environ['DB_NAME']
- db_password = os.environ['DB_PASSWORD']
- db_user = os.environ['DB_USER']
- rds_endpoint = os.environ['RDS_ENDPOINT']
- return (db_name, db_password, db_user, rds_endpoint)
- def connect_to_rds():
- try:
- db_name, db_password, db_user, rds_endpoint = get_db_creds()
- conn = psycopg2.connect(dbname=db_name, user=db_user, host=rds_endpoint, password=db_password)
- return (conn, conn.cursor())
- except Exception as e:
- logger.error("Lambda: ERROR Unable to connect to RDS")
- sys.exit()
- def unpack_record(record):
- """Kinesis data is base64 encoded - unpack the payload"""
- payload = base64.b64decode(record["kinesis"]["data"])
- json_record = json.loads(payload)
- return json_record
- # this event is a blob of JSON from Kinesis
- # let's assume we know the error will be id (from a DB maybe? or the user? who knows)
- # as well as the page the event was triggered on in the app (path), a status and a created time
- # something like:
- # {
- # "id": "1",
- # "path": "/login",
- # "status": "500",
- # "event_created_at": "2018-07-20T13:28:59.529Z"
- # }
- def error_sink(event, context):
- logger.info("Lambda: Executing...")
- records = event['Records']
- conn, cur = connect_to_rds()
- json_records = tuple(map(unpack_record, records))
- try: # NB - executemany isn't necessarily quicker, it's just terser in this example. See the docs for more...
- cur.executemany("""INSERT INTO errorevents(id,path,status,event_created_at) VALUES (%(id)s, %(path)s, %(status)s, %(event_created_at)s)""", json_records)
- conn.commit()
- logger.info("Lambda: " + str(json_records.__len__()) + " records processed")
- except Exception as e:
- logger.error("Lambda: EXCEPTION in error_sink: " + str(e))
- finally:
- cur.close()
Add Comment
Please, Sign In to add comment