Advertisement
Guest User

Untitled

a guest
May 17th, 2023
170
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.36 KB | None | 0 0
  1. import asyncio
  2. import contextlib
  3. from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
  4. from functools import partial
  5. from multiprocessing import Pipe, current_process
  6. from threading import Thread
  7. from typing import Union
  8.  
  9. import asyncpg
  10. from asyncpg import Pool
  11. from loguru import logger
  12.  
  13. WORKER_PROCS = 20
  14. WORKER_THREADS = 40
  15.  
  16.  
  17. def _proc_initializer():
  18.     proc = current_process()
  19.     logger.info(f"Initialize {proc.name}")
  20.  
  21.     proc.loop = asyncio.new_event_loop()
  22.     proc.loop_runner = Thread(target=proc.loop.run_forever, daemon=True)
  23.     proc.loop_runner.start()
  24.     logger.info(f"{proc.name} started event loop")
  25.  
  26.     proc.thread_executor = ThreadPoolExecutor(max_workers=WORKER_THREADS)
  27.     logger.info(f"{proc.name} created pool with {WORKER_THREADS} threads")
  28.  
  29.  
  30. _proc_executor = ProcessPoolExecutor(
  31.     max_workers=WORKER_PROCS, initializer=_proc_initializer
  32. )
  33.  
  34.  
  35. def _run_in_thread(coro):
  36.     proc = current_process()
  37.  
  38.     ft = asyncio.run_coroutine_threadsafe(coro(), loop=proc.loop)
  39.  
  40.     return ft.result()
  41.  
  42.  
  43. def _run_coro(coro, task_conn):
  44.     async def wrapper():
  45.         task_conn.send(True)
  46.         return await coro()
  47.  
  48.     return wrapper
  49.  
  50.  
  51. def _run_in_process(coro):
  52.     proc = current_process()
  53.     proc_conn, task_conn = Pipe()
  54.  
  55.     coro = _run_coro(coro, task_conn)
  56.     ft = proc.loop.run_in_executor(proc.thread_executor, _run_in_thread, coro)
  57.  
  58.     proc_conn.recv()
  59.  
  60.     def task_done_callback(ft):
  61.         task = {"task_id": str(coro)}
  62.  
  63.         try:
  64.             task["result"] = ft.result()
  65.         except Exception as e:
  66.             task["exception"] = e
  67.             logger.exception(f"{proc.name} failed processing task {coro}")
  68.         finally:
  69.             task_conn.send(task)
  70.  
  71.     ft.add_done_callback(task_done_callback)
  72.  
  73.     return proc_conn
  74.  
  75.  
  76. _loop = asyncio.get_event_loop()
  77.  
  78.  
  79. async def submit(coro, *args, wait_executor=None, **kwargs):
  80.     coro = partial(coro, *args, **kwargs)
  81.     task_pipe = await _loop.run_in_executor(_proc_executor, _run_in_process, coro)
  82.  
  83.     waiter = wait_executor or ThreadPoolExecutor(max_workers=1)
  84.     task = await _loop.run_in_executor(waiter, task_pipe.recv)
  85.  
  86.     with contextlib.suppress(KeyError):
  87.         raise task["exception"]
  88.     return task["result"]
  89.  
  90.  
  91. class Database:
  92.     def __init__(self):
  93.         self.pool: Union[Pool, None] = None
  94.  
  95.     async def create(self):
  96.         pool = await asyncpg.create_pool(
  97.             user='postgres',
  98.             password='postgres',
  99.             host='postgres',
  100.             port=5433,
  101.             database='olxparser_db',
  102.             max_size=4,
  103.             min_size=1,
  104.         )
  105.         self.pool = pool
  106.  
  107.     async def get_active_tracks_ebay(self):
  108.         sql = """
  109.            SELECT * FROM running_trackers_ebay;
  110.        """
  111.         async with self.pool.acquire() as con:
  112.             return await con.fetch(sql)
  113.  
  114.  
  115. _db = None
  116.  
  117.  
  118. async def get_database():
  119.     global _db
  120.  
  121.     if _db is None:
  122.         _db = Database()
  123.         await _db.create()
  124.  
  125.     return _db
  126.  
  127.  
  128. async def run_check_tracker():
  129.     db = await get_database()
  130.     active_tracks = await db.get_active_tracks_ebay()
  131.     print(active_tracks)
  132.  
  133.  
  134. async def main():
  135.     db = Database()
  136.     await db.create()
  137.  
  138.     asyncio.create_task(submit(run_check_tracker))
  139.  
  140.  
  141. if __name__ == '__main__':
  142.     asyncio.run(main())
  143.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement