Guest User

Untitled

a guest
Dec 2nd, 2018
138
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.28 KB | None | 0 0
  1. import pymysql
  2. import argparse
  3. from time import sleep
  4. from enum import Enum
  5. from concurrent import futures
  6.  
  7. '''
  8. CREATE TABLE `user` (
  9. `email` varchar(32) NOT NULL,
  10. `name` varchar(32) NOT NULL
  11. ) ENGINE=InnoDB DEFAULT CHARSET=latin1
  12. '''
  13.  
  14. class IsolationLevel(Enum):
  15. RU = 'READ UNCOMMITTED'
  16. RC = 'READ COMMITTED'
  17. RR = 'REPEATABLE READ'
  18. S = 'SERIALIZABLE'
  19.  
  20. def set_isolation(cursor, level: str):
  21. if level:
  22. cursor.execute(f"SET SESSION TRANSACTION ISOLATION LEVEL {IsolationLevel[level].value}")
  23.  
  24. def get_connection():
  25. return pymysql.connect(
  26. database='test',
  27. user="root",
  28. password="123456"
  29. )
  30.  
  31. def select_and_insert_user(id, isolation):
  32. mysql = get_connection()
  33. with mysql.cursor() as cursor:
  34. set_isolation(cursor, isolation)
  35. cursor.execute(
  36. '''select * from user where email = 'unique@example.com' for update''')
  37. result = cursor.fetchone()
  38. if result is None:
  39. cursor.execute(
  40. '''insert into user (email, name) values ('unique@example.com', %s);''', f'User {id}')
  41. # sleep(1)
  42. mysql.commit()
  43.  
  44. def insert_and_select_user(id, isolation):
  45. mysql = get_connection()
  46. with mysql.cursor() as cursor:
  47. set_isolation(cursor, isolation)
  48. cursor.execute(
  49. '''select count(*) as count from user where email = 'unique@example.com' ''')
  50. cursor.execute(
  51. '''insert into user (email, name) values ('unique@example.com', %s);''', f'User {id}')
  52. cursor.execute(
  53. '''select count(*) as count from user where email = 'unique@example.com' ''')
  54. count, = cursor.fetchone()
  55. if count > 1:
  56. print('id: %s rolling back' % id)
  57. # sleep(1)
  58. mysql.rollback()
  59. else:
  60. # sleep(1)
  61. mysql.commit()
  62.  
  63. def run(f, n, isolation=None):
  64. print(f'running {f.__name__} with {n} max_workers ...')
  65. if isolation is not None:
  66. print(f'isolation changed to {IsolationLevel[isolation].value}')
  67. with futures.ThreadPoolExecutor(n) as executor:
  68. for i in range(n):
  69. executor.submit(f, i, isolation)
  70.  
  71. def report_and_truncate() -> '重复个数':
  72. mysql = get_connection()
  73. count = 0
  74. with mysql.cursor() as cursor:
  75. cursor.execute("select * from user")
  76. print('-----rows-------')
  77. for row in cursor:
  78. count += 1
  79. print(row)
  80. print('-----rows-------')
  81. print(f'total: {count}')
  82. cursor.execute("truncate user")
  83. mysql.commit()
  84. return count
  85.  
  86. if __name__ == "__main__":
  87. parser = argparse.ArgumentParser()
  88. parser.add_argument("-n", dest="n", type=int, default=100, help="ThreadPoolExecutor max_workers")
  89. parser.add_argument("-r", dest="repeat", type=int, default=1, help="多次试验")
  90. parser.add_argument("-i", dest="isolation", type=str, default=None, help="事务隔离级别")
  91. args = parser.parse_args()
  92.  
  93. print()
  94. print('args', args)
  95. result = []
  96. for _ in range(args.repeat):
  97. run(select_and_insert_user, args.n, args.isolation)
  98. # run(select_and_insert_user, args.n, args.isolation)
  99. duplicate_count = report_and_truncate()
  100. result.append(duplicate_count)
  101. print()
  102.  
  103. print()
  104. print('duplicate counts', result)
  105. print()
Add Comment
Please, Sign In to add comment