Advertisement
Guest User

Untitled

a guest
Jun 24th, 2019
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.19 KB | None | 0 0
  1. import json
  2. import boto3
  3. from datetime import datetime
  4.  
  5. from pymysqlreplication import BinLogStreamReader
  6. from pymysqlreplication.row_event import (
  7. DeleteRowsEvent,
  8. UpdateRowsEvent,
  9. WriteRowsEvent,
  10. )
  11.  
  12. class DateTimeEncoder(json.JSONEncoder):
  13. def default(self, o):
  14. if isinstance(o, datetime):
  15. return o.isoformat()
  16.  
  17. return json.JSONEncoder.default(self, o)
  18.  
  19. def main():
  20. mysql = {
  21. "host": "",
  22. "port":,
  23. "user": "",
  24. "passwd": "",
  25. "db": ""}
  26. kinesis = boto3.client("kinesis", region_name = 'us-west-2')
  27.  
  28. stream = BinLogStreamReader(
  29. connection_settings = mysql,
  30. server_id=100,
  31. blocking = True,
  32. log_file='mysql-bin.000003',
  33. resume_stream=True,
  34. only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
  35. for binlogevent in stream:
  36. for row in binlogevent.rows:
  37. print row
  38. event = {"schema": binlogevent.schema,
  39. "table": binlogevent.table,
  40. "type": type(binlogevent).__name__,
  41. "row": row
  42. }
  43.  
  44. kinesis.put_record(StreamName="jhgjh", Data=json.dumps(event, cls=DateTimeEncoder), PartitionKey="default")
  45. #print json.dumps(event)
  46.  
  47. if __name__ == "__main__":
  48. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement