Advertisement
Guest User

Concurrent tasks updater

a guest
Oct 11th, 2019
351
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.26 KB | None | 0 0
  1. # Python PostgreSQL Connection Pooling Using Psycopg2: https://pynative.com/psycopg2-python-postgresql-connection-pooling/
  2.  
  3. import sys
  4. import threading
  5. import psycopg2 as pg
  6. from psycopg2.extras import execute_values
  7. from psycopg2.pool import ThreadedConnectionPool
  8. from threading import Semaphore, Thread
  9.  
  10.  
  11. config = {
  12.     "dbname": "admin_db",
  13.     "user": "admin",
  14.     "password": "admin",
  15.     "host": "localhost",
  16.     "port": 5432,
  17. }
  18.  
  19.  
  20. def setup_db():
  21.     SQL = "INSERT INTO tasks (name, status) VALUES (%s, %s)"
  22.  
  23.     connection = pg.connect(**config)
  24.     with connection.cursor() as cur:
  25.         cur.executemany(SQL, ((f'task_{i}', 'new') for i in range(100_000)))
  26.  
  27.     connection.commit()
  28.     connection.close()
  29.  
  30.  
  31. def clear_db():
  32.     SQL = "DELETE FROM tasks"
  33.  
  34.     connection = pg.connect(**config)
  35.     with connection.cursor() as cur:
  36.         cur.execute(SQL)
  37.  
  38.     connection.commit()
  39.     connection.close()
  40.  
  41.  
  42. class TaskUpdater:
  43.     @staticmethod
  44.     def fetch_batch(thread_pool, offset, batch_size, semaphore,
  45.                     SELECT_SQL="SELECT * FROM tasks ORDER BY task_id LIMIT %s OFFSET %s",
  46.                     UDPATE_SQL="UPDATE tasks SET status = 'done' FROM (VALUES %s) AS data(task_id) WHERE tasks.task_id = data.task_id"):
  47.         semaphore.acquire()
  48.         connection = thread_pool.getconn()
  49.  
  50.         with connection.cursor() as cur:
  51.             cur.execute(SELECT_SQL, (batch_size, offset))
  52.             batch = cur.fetchall()
  53.             print(len(batch), offset, batch_size, batch[0])
  54.             # do_something
  55.             execute_values(cur, UDPATE_SQL, ((b[0],) for b in batch))
  56.         connection.commit()
  57.  
  58.         thread_pool.putconn(connection)
  59.         semaphore.release()
  60.  
  61.     _instance = None
  62.  
  63.     def __new__(cls):
  64.         if cls._instance is None:
  65.             cls._instance = super().__new__(cls)
  66.         return cls._instance
  67.  
  68.     def init(self, thread_count=10, batch_size=1000):
  69.         self._batch_size = batch_size
  70.         self._total_amount = None
  71.         self._pg_pool = ThreadedConnectionPool(0, thread_count, **config)
  72.         self._semaphore = Semaphore(thread_count)
  73.  
  74.     def run(self):
  75.         self._fetch_total_amount()
  76.  
  77.         threads = []
  78.         for i in range(0, self._total_amount, self._batch_size):
  79.             thread = Thread(target=self.fetch_batch,
  80.                             args=(self._pg_pool, i, self._batch_size, self._semaphore))
  81.             thread.start()
  82.             threads.append(thread)
  83.  
  84.         for thread in threads:
  85.             thread.join()
  86.  
  87.     def _fetch_total_amount(self):
  88.         conn = self._pg_pool.getconn()
  89.  
  90.         with conn.cursor() as cur:
  91.             cur.execute("SELECT COUNT(*) FROM tasks")
  92.             result = cur.fetchone()[0]
  93.             self._total_amount = result
  94.  
  95.         self._pg_pool.putconn(conn)
  96.         assert self._total_amount is not None
  97.  
  98.     def finish(self):
  99.         self._pg_pool.closeall()
  100.  
  101.  
  102. def main():
  103.     if sys.argv[1] == "setup":
  104.         setup_db()
  105.     elif sys.argv[1] == "clear":
  106.         clear_db()
  107.     elif sys.argv[1] == "run":
  108.         updater = TaskUpdater()
  109.         updater.init()
  110.         updater.run()
  111.         updater.finish()
  112.         print(updater._total_amount)
  113.  
  114.  
  115. if __name__ == "__main__":
  116.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement