Guest User

Untitled

a guest
Jul 20th, 2018
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.89 KB | None | 0 0
  1. import json
  2. import requests
  3. import logging
  4. import base64
  5. import psycopg2
  6. import os
  7. import sys
  8.  
  9. logger = logging.getLogger()
  10. logger.setLevel(logging.INFO)
  11.  
  12. def get_db_creds():
  13. db_name = os.environ['DB_NAME']
  14. db_password = os.environ['DB_PASSWORD']
  15. db_user = os.environ['DB_USER']
  16. rds_endpoint = os.environ['RDS_ENDPOINT']
  17. return (db_name, db_password, db_user, rds_endpoint)
  18.  
  19. def connect_to_rds():
  20. try:
  21. db_name, db_password, db_user, rds_endpoint = get_db_creds()
  22. conn = psycopg2.connect(dbname=db_name, user=db_user, host=rds_endpoint, password=db_password)
  23. return (conn, conn.cursor())
  24. except Exception as e:
  25. logger.error("Lambda: ERROR Unable to connect to RDS")
  26. sys.exit()
  27.  
  28. def unpack_record(record):
  29. """Kinesis data is base64 encoded - unpack the payload"""
  30. payload = base64.b64decode(record["kinesis"]["data"])
  31. json_record = json.loads(payload)
  32. return json_record
  33.  
  34. # this event is a blob of JSON from Kinesis
  35. # let's assume we know the error will be id (from a DB maybe? or the user? who knows)
  36. # as well as the page the event was triggered on in the app (path), a status and a created time
  37. # something like:
  38. # {
  39. # "id": "1",
  40. # "path": "/login",
  41. # "status": "500",
  42. # "event_created_at": "2018-07-20T13:28:59.529Z"
  43. # }
  44. def error_sink(event, context):
  45. logger.info("Lambda: Executing...")
  46. records = event['Records']
  47. conn, cur = connect_to_rds()
  48. json_records = tuple(map(unpack_record, records))
  49. try: # NB - executemany isn't necessarily quicker, it's just terser in this example. See the docs for more...
  50. cur.executemany("""INSERT INTO errorevents(id,path,status,event_created_at) VALUES (%(id)s, %(path)s, %(status)s, %(event_created_at)s)""", json_records)
  51. conn.commit()
  52. logger.info("Lambda: " + str(json_records.__len__()) + " records processed")
  53. except Exception as e:
  54. logger.error("Lambda: EXCEPTION in error_sink: " + str(e))
  55. finally:
  56. cur.close()
Add Comment
Please, Sign In to add comment