Advertisement
Guest User

Untitled

a guest
Oct 9th, 2017
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.04 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. import argparse
  3. import logging
  4. import sys
  5. import threading
  6.  
  7. import pymysql.cursors
  8.  
  9. from datetime import datetime
  10.  
  11.  
  12. reload(sys)
  13. sys.setdefaultencoding('utf8')
  14.  
  15. DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
  16.  
  17.  
  18. def db_conn(host, port, username, password, database, charset='utf8'):
  19. return pymysql.connect(
  20. host=host,
  21. port=port,
  22. user=username,
  23. password=password,
  24. db=database,
  25. charset=charset,
  26. cursorclass=pymysql.cursors.DictCursor)
  27.  
  28.  
  29. class CleanTask(threading.Thread):
  30.  
  31. def __init__(self, from_dt, to_dt):
  32. threading.Thread.__init__(self)
  33. self.start_dt = from_dt
  34. self.end_dt = to_dt
  35.  
  36. def run(self):
  37. # tidb database
  38. host = '10.30.1.4'
  39. port = 4000
  40. username = 'root'
  41. password = ''
  42. database = 'lock_event_log'
  43. conn = db_conn(host, port, username, password, database)
  44.  
  45. batch_size = 1000
  46. _dml = '''
  47. DELETE FROM `mbk_lock_event`
  48. WHERE CREATE_TIME >= '%s'
  49. AND CREATE_TIME < '%s'
  50. LIMIT %d
  51. '''
  52.  
  53. _start_dt= self.start_dt.strftime(DATE_FORMAT)
  54. _end_dt= self.end_dt.strftime(DATE_FORMAT)
  55.  
  56. count = 0
  57. with conn.cursor() as cursor:
  58. result = cursor.execute(_dml % (_start_dt, _end_dt, batch_size))
  59. conn.commit()
  60. count += result
  61. while result > 0:
  62. result = cursor.execute(_dml % (_start_dt, _end_dt, batch_size))
  63. conn.commit()
  64. count += result
  65.  
  66. logging.info('delete %d records, time range [%s, %s)', count, self.start_dt.strftime(DATE_FORMAT), self.end_dt.strftime(DATE_FORMAT))
  67.  
  68.  
  69. def dispatch_task(start_dt, end_dt, chunks):
  70. logging.info('Start data processing...')
  71.  
  72. # dispatch task
  73. tasks = []
  74.  
  75. slot = (end_dt - start_dt) / chunks
  76. n = 0
  77. for b, e in [(i*slot + start_dt, (i+1)*slot + start_dt) for i in xrange(chunks)]:
  78. logging.info("Task %d, time range [%s, %s)", n, b.strftime(DATE_FORMAT), e.strftime(DATE_FORMAT))
  79. task = CleanTask(b, e)
  80. tasks.append(task)
  81. n += 1
  82.  
  83. for t in tasks:
  84. t.start()
  85.  
  86. for t in tasks:
  87. t.join()
  88.  
  89. logging.info('Finish data processing')
  90.  
  91.  
  92. def main():
  93. arg_parser = argparse.ArgumentParser()
  94. arg_parser.add_argument('start_time', type=str, help='start time')
  95. arg_parser.add_argument('end_time', type=str, help='end time')
  96. arg_parser.add_argument('--chunks', type=int, default=1, help='# of chunks')
  97. args = arg_parser.parse_args()
  98.  
  99. start_dt = datetime.strptime(args.start_time, DATE_FORMAT)
  100. end_dt = datetime.strptime(args.end_time, DATE_FORMAT)
  101.  
  102. dispatch_task(start_dt, end_dt, args.chunks)
  103.  
  104.  
  105. if __name__ == '__main__':
  106. # define logging
  107. root = logging.getLogger()
  108. root.setLevel(logging.DEBUG)
  109.  
  110. ch = logging.StreamHandler(sys.stdout)
  111. ch.setLevel(logging.DEBUG)
  112. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  113. ch.setFormatter(formatter)
  114. root.addHandler(ch)
  115.  
  116. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement