Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import argparse
- import logging
- import sys
- import threading
- import pymysql.cursors
- from datetime import datetime
- reload(sys)
- sys.setdefaultencoding('utf8')
- DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
- def db_conn(host, port, username, password, database, charset='utf8'):
- return pymysql.connect(
- host=host,
- port=port,
- user=username,
- password=password,
- db=database,
- charset=charset,
- cursorclass=pymysql.cursors.DictCursor)
- class CleanTask(threading.Thread):
- def __init__(self, from_dt, to_dt):
- threading.Thread.__init__(self)
- self.start_dt = from_dt
- self.end_dt = to_dt
- def run(self):
- # tidb database
- host = '10.30.1.4'
- port = 4000
- username = 'root'
- password = ''
- database = 'lock_event_log'
- conn = db_conn(host, port, username, password, database)
- batch_size = 1000
- _dml = '''
- DELETE FROM `mbk_lock_event`
- WHERE CREATE_TIME >= '%s'
- AND CREATE_TIME < '%s'
- LIMIT %d
- '''
- _start_dt= self.start_dt.strftime(DATE_FORMAT)
- _end_dt= self.end_dt.strftime(DATE_FORMAT)
- count = 0
- with conn.cursor() as cursor:
- result = cursor.execute(_dml % (_start_dt, _end_dt, batch_size))
- conn.commit()
- count += result
- while result > 0:
- result = cursor.execute(_dml % (_start_dt, _end_dt, batch_size))
- conn.commit()
- count += result
- logging.info('delete %d records, time range [%s, %s)', count, self.start_dt.strftime(DATE_FORMAT), self.end_dt.strftime(DATE_FORMAT))
- def dispatch_task(start_dt, end_dt, chunks):
- logging.info('Start data processing...')
- # dispatch task
- tasks = []
- slot = (end_dt - start_dt) / chunks
- n = 0
- for b, e in [(i*slot + start_dt, (i+1)*slot + start_dt) for i in xrange(chunks)]:
- logging.info("Task %d, time range [%s, %s)", n, b.strftime(DATE_FORMAT), e.strftime(DATE_FORMAT))
- task = CleanTask(b, e)
- tasks.append(task)
- n += 1
- for t in tasks:
- t.start()
- for t in tasks:
- t.join()
- logging.info('Finish data processing')
- def main():
- arg_parser = argparse.ArgumentParser()
- arg_parser.add_argument('start_time', type=str, help='start time')
- arg_parser.add_argument('end_time', type=str, help='end time')
- arg_parser.add_argument('--chunks', type=int, default=1, help='# of chunks')
- args = arg_parser.parse_args()
- start_dt = datetime.strptime(args.start_time, DATE_FORMAT)
- end_dt = datetime.strptime(args.end_time, DATE_FORMAT)
- dispatch_task(start_dt, end_dt, args.chunks)
- if __name__ == '__main__':
- # define logging
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- ch = logging.StreamHandler(sys.stdout)
- ch.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- ch.setFormatter(formatter)
- root.addHandler(ch)
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement