Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import boto3
- from datetime import datetime
- from pymysqlreplication import BinLogStreamReader
- from pymysqlreplication.row_event import (
- DeleteRowsEvent,
- UpdateRowsEvent,
- WriteRowsEvent,
- )
- class DateTimeEncoder(json.JSONEncoder):
- def default(self, o):
- if isinstance(o, datetime):
- return o.isoformat()
- return json.JSONEncoder.default(self, o)
- def main():
- mysql = {
- "host": "",
- "port":,
- "user": "",
- "passwd": "",
- "db": ""}
- kinesis = boto3.client("kinesis", region_name = 'us-west-2')
- stream = BinLogStreamReader(
- connection_settings = mysql,
- server_id=100,
- blocking = True,
- log_file='mysql-bin.000003',
- resume_stream=True,
- only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
- for binlogevent in stream:
- for row in binlogevent.rows:
- print row
- event = {"schema": binlogevent.schema,
- "table": binlogevent.table,
- "type": type(binlogevent).__name__,
- "row": row
- }
- kinesis.put_record(StreamName="jhgjh", Data=json.dumps(event, cls=DateTimeEncoder), PartitionKey="default")
- #print json.dumps(event)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement