Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # multithreaded sqlite connector
- import sqlite3
- import time
- import threading
- import weakref
- # function for get instance of created singleton SQLiter globally in any place of code
- def get_sqliter():
- return MetaSingleton._instance[0] if MetaSingleton._instance else None
- # special class for singleton pattern
- class MetaSingleton(type):
- _instance = []
- def __call__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance.append(type.__call__(cls, *args, **kwargs))
- else:
- if args or kwargs:
- raise Exception("Can't recreate singleton with new args until old will be cleared")
- return weakref.proxy(cls._instance[0])
- # class SQLiter with singleton pattern
- class SQLiter(metaclass = MetaSingleton):
- # memory db file and locker for one-in-time execution sql command
- def __init__(self, database):
- self.database = database
- self.lock = threading.Lock()
- # delete instance for available to create new instance of singletone
- def clear(self):
- MetaSingleton._instance.pop()
- # for execute sql command without return value
- def execute(self, sql, values=[]):
- with self.lock:
- with sqlite3.connect(self.database) as conn:
- conn.execute(sql, values)
- # for execute sql command with return value
- def select(self, sql, values=[]):
- with self.lock:
- with sqlite3.connect(self.database) as conn:
- return list(conn.execute(sql, values))
- if __name__ == '__main__':
- sqliter = SQLiter('test.sqlite')
- def test_select(number):
- threads = []
- for i in range(number):
- sql, values = """ select 1 """, []
- thread = threading.Thread(target=sqliter.execute, args=(sql, values))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- def test_insert(number):
- sqliter.execute(""" create table if not exists test (id int) """)
- sqliter.execute(""" delete from test """)
- threads = []
- for i in range(number):
- time.sleep(0.001) # need to start >1000 threads, but don't change time statistic (why?)
- sql, values = """ insert into test(id) values(?)""", (i, )
- thread = threading.Thread(target=sqliter.execute, args=(sql, values))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- number = 1000
- t1 = time.monotonic()
- if 1:
- test_select(number)
- elif 0:
- test_insert(number)
- else:
- pass
- t2 = time.monotonic()
- # results of tests:
- # select 1 for 1000 times: 0.156
- # insert one row for 1000 times: 4.280 seconds
- # select 1 for 5000 times: 0.790
- # insert one row for 5000 times: 21.483 seconds
- # select 1 for 10000 times: 1.588
- # insert one row for 10000 times: 43.941 seconds
- print('done in %.3f seconds' % (t2-t1))
Add Comment
Please, Sign In to add comment