Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Python PostgreSQL Connection Pooling Using Psycopg2: https://pynative.com/psycopg2-python-postgresql-connection-pooling/
- import sys
- import threading
- import psycopg2 as pg
- from psycopg2.extras import execute_values
- from psycopg2.pool import ThreadedConnectionPool
- from threading import Semaphore, Thread
- config = {
- "dbname": "admin_db",
- "user": "admin",
- "password": "admin",
- "host": "localhost",
- "port": 5432,
- }
- def setup_db():
- SQL = "INSERT INTO tasks (name, status) VALUES (%s, %s)"
- connection = pg.connect(**config)
- with connection.cursor() as cur:
- cur.executemany(SQL, ((f'task_{i}', 'new') for i in range(100_000)))
- connection.commit()
- connection.close()
- def clear_db():
- SQL = "DELETE FROM tasks"
- connection = pg.connect(**config)
- with connection.cursor() as cur:
- cur.execute(SQL)
- connection.commit()
- connection.close()
- class TaskUpdater:
- @staticmethod
- def fetch_batch(thread_pool, offset, batch_size, semaphore,
- SELECT_SQL="SELECT * FROM tasks ORDER BY task_id LIMIT %s OFFSET %s",
- UDPATE_SQL="UPDATE tasks SET status = 'done' FROM (VALUES %s) AS data(task_id) WHERE tasks.task_id = data.task_id"):
- semaphore.acquire()
- connection = thread_pool.getconn()
- with connection.cursor() as cur:
- cur.execute(SELECT_SQL, (batch_size, offset))
- batch = cur.fetchall()
- print(len(batch), offset, batch_size, batch[0])
- # do_something
- execute_values(cur, UDPATE_SQL, ((b[0],) for b in batch))
- connection.commit()
- thread_pool.putconn(connection)
- semaphore.release()
- _instance = None
- def __new__(cls):
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- return cls._instance
- def init(self, thread_count=10, batch_size=1000):
- self._batch_size = batch_size
- self._total_amount = None
- self._pg_pool = ThreadedConnectionPool(0, thread_count, **config)
- self._semaphore = Semaphore(thread_count)
- def run(self):
- self._fetch_total_amount()
- threads = []
- for i in range(0, self._total_amount, self._batch_size):
- thread = Thread(target=self.fetch_batch,
- args=(self._pg_pool, i, self._batch_size, self._semaphore))
- thread.start()
- threads.append(thread)
- for thread in threads:
- thread.join()
- def _fetch_total_amount(self):
- conn = self._pg_pool.getconn()
- with conn.cursor() as cur:
- cur.execute("SELECT COUNT(*) FROM tasks")
- result = cur.fetchone()[0]
- self._total_amount = result
- self._pg_pool.putconn(conn)
- assert self._total_amount is not None
- def finish(self):
- self._pg_pool.closeall()
- def main():
- if sys.argv[1] == "setup":
- setup_db()
- elif sys.argv[1] == "clear":
- clear_db()
- elif sys.argv[1] == "run":
- updater = TaskUpdater()
- updater.init()
- updater.run()
- updater.finish()
- print(updater._total_amount)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement