Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pymysql
- import argparse
- from time import sleep
- from enum import Enum
- from concurrent import futures
- '''
- CREATE TABLE `user` (
- `email` varchar(32) NOT NULL,
- `name` varchar(32) NOT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=latin1
- '''
- class IsolationLevel(Enum):
- RU = 'READ UNCOMMITTED'
- RC = 'READ COMMITTED'
- RR = 'REPEATABLE READ'
- S = 'SERIALIZABLE'
- def set_isolation(cursor, level: str):
- if level:
- cursor.execute(f"SET SESSION TRANSACTION ISOLATION LEVEL {IsolationLevel[level].value}")
- def get_connection():
- return pymysql.connect(
- database='test',
- user="root",
- password="123456"
- )
- def select_and_insert_user(id, isolation):
- mysql = get_connection()
- with mysql.cursor() as cursor:
- set_isolation(cursor, isolation)
- cursor.execute(
- '''select * from user where email = 'unique@example.com' for update''')
- result = cursor.fetchone()
- if result is None:
- cursor.execute(
- '''insert into user (email, name) values ('unique@example.com', %s);''', f'User {id}')
- # sleep(1)
- mysql.commit()
- def insert_and_select_user(id, isolation):
- mysql = get_connection()
- with mysql.cursor() as cursor:
- set_isolation(cursor, isolation)
- cursor.execute(
- '''select count(*) as count from user where email = 'unique@example.com' ''')
- cursor.execute(
- '''insert into user (email, name) values ('unique@example.com', %s);''', f'User {id}')
- cursor.execute(
- '''select count(*) as count from user where email = 'unique@example.com' ''')
- count, = cursor.fetchone()
- if count > 1:
- print('id: %s rolling back' % id)
- # sleep(1)
- mysql.rollback()
- else:
- # sleep(1)
- mysql.commit()
- def run(f, n, isolation=None):
- print(f'running {f.__name__} with {n} max_workers ...')
- if isolation is not None:
- print(f'isolation changed to {IsolationLevel[isolation].value}')
- with futures.ThreadPoolExecutor(n) as executor:
- for i in range(n):
- executor.submit(f, i, isolation)
- def report_and_truncate() -> '重复个数':
- mysql = get_connection()
- count = 0
- with mysql.cursor() as cursor:
- cursor.execute("select * from user")
- print('-----rows-------')
- for row in cursor:
- count += 1
- print(row)
- print('-----rows-------')
- print(f'total: {count}')
- cursor.execute("truncate user")
- mysql.commit()
- return count
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("-n", dest="n", type=int, default=100, help="ThreadPoolExecutor max_workers")
- parser.add_argument("-r", dest="repeat", type=int, default=1, help="多次试验")
- parser.add_argument("-i", dest="isolation", type=str, default=None, help="事务隔离级别")
- args = parser.parse_args()
- print()
- print('args', args)
- result = []
- for _ in range(args.repeat):
- run(select_and_insert_user, args.n, args.isolation)
- # run(select_and_insert_user, args.n, args.isolation)
- duplicate_count = report_and_truncate()
- result.append(duplicate_count)
- print()
- print()
- print('duplicate counts', result)
- print()
Add Comment
Please, Sign In to add comment