Guest User

Untitled

a guest
Dec 28th, 2017
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.45 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import json
  5. import sys
  6. import pymysql.cursors
  7.  
  8. from pymysqlreplication import BinLogStreamReader
  9. from pymysqlreplication.row_event import (
  10. DeleteRowsEvent,
  11. UpdateRowsEvent,
  12. )
  13.  
  14. LOG_DB_HOST = '<CHANGELOG MySQL HOST>'
  15. LOG_DB_NAME = '<CHANGELOG MySQL DB>'
  16. SRC_DB_HOST = '<SOURCE MySQL HOST>'
  17. MYSQL_USER = '<MySQL USER>'
  18. MYSQL_PASS = '<MySQL PASS>'
  19. TABLE = 'hits'
  20.  
  21. MYSQL_SETTINGS = {
  22. "host": SRC_DB_HOST,
  23. "port": 3306,
  24. "user": MYSQL_USER,
  25. "passwd": MYSQL_PASS
  26. }
  27.  
  28. def connect_log_db(host):
  29. return pymysql.connect(
  30. host=host,
  31. port=3306,
  32. user=MYSQL_USER,
  33. passwd=MYSQL_PASS,
  34. db=LOG_DB_NAME,
  35. charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor)
  36.  
  37. def last_file_pos(conlogdb):
  38. sql = ("SELECT log_file, log_pos FROM clickhouse_changelog "
  39. "ORDER BY log_file DESC, log_pos DESC LIMIT 1")
  40.  
  41. with conlogdb.cursor() as cursor:
  42. cursor.execute(sql)
  43. return cursor.fetchone()
  44.  
  45. def master_status(conlogdb):
  46. sql = "SHOW MASTER STATUS"
  47.  
  48. with conlogdb.cursor() as cursor:
  49. cursor.execute(sql)
  50. return cursor.fetchone()
  51.  
  52. def insert_log_db(conlogdb, values):
  53. with conlogdb.cursor() as cursor:
  54. # Create a new record
  55. sql = (
  56. "REPLACE INTO `clickhouse_changelog` "
  57. "(`db`, `tbl`, `created_at`, `log_file`, `log_pos`) "
  58. "VALUES (%s, %s, DATE_ADD(%s, INTERVAL - WEEKDAY(%s) DAY), %s, %s)")
  59. cursor.execute(sql, values)
  60.  
  61. # connection is not autocommit by default. So you must commit to save
  62. # your changes.
  63. conlogdb.commit()
  64.  
  65. def main():
  66. values = None
  67. conlogdb = connect_log_db(LOG_DB_HOST)
  68. consrcdb = connect_log_db(SRC_DB_HOST)
  69.  
  70. file_pos = last_file_pos(conlogdb)
  71. if file_pos is not None:
  72. log_file = file_pos['log_file']
  73. log_pos = file_pos['log_pos']
  74. else:
  75. file_pos = master_status(consrcdb)
  76. log_file = file_pos['File']
  77. log_pos = file_pos['Position']
  78.  
  79. print "Starting streaming at file: %s, position: %s" % (log_file, log_pos)
  80.  
  81. stream = BinLogStreamReader(
  82. connection_settings=MYSQL_SETTINGS, resume_stream=True,
  83. server_id=172313514, log_file=log_file, log_pos=log_pos,
  84. only_events=[DeleteRowsEvent, UpdateRowsEvent], blocking=True)
  85.  
  86. # If previous week/table processed is the same, we avoid the INSERT as
  87. # its redundant and affects performance
  88. pweek = None
  89. ptable = None
  90.  
  91. for binlogevent in stream:
  92. for row in binlogevent.rows:
  93. if binlogevent.table != TABLE: continue
  94.  
  95. if isinstance(binlogevent, DeleteRowsEvent):
  96. values = row["values"]
  97. elif isinstance(binlogevent, UpdateRowsEvent):
  98. values = row["after_values"]
  99. else:
  100. continue
  101.  
  102. if ptable == binlogevent.table and pweek == values['created_at'].strftime('%Y-%m-%d'):
  103. continue
  104.  
  105. ptable = binlogevent.table
  106. pweek = values['created_at'].strftime('%Y-%m-%d')
  107.  
  108. # action keys '0 unk, 1 ins, 2 upd, 3 del'
  109. event = (binlogevent.schema, binlogevent.table,
  110. values['created_at'].strftime('%Y-%m-%d'),
  111. values['created_at'].strftime('%Y-%m-%d'),
  112. stream.log_file, int(stream.log_pos))
  113. insert_log_db(conlogdb, event)
  114. sys.stdout.flush()
  115.  
  116.  
  117. stream.close()
  118.  
  119.  
  120. if __name__ == "__main__":
  121. main()
Add Comment
Please, Sign In to add comment