Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import contextlib
- from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
- from functools import partial
- from multiprocessing import Pipe, current_process
- from threading import Thread
- from typing import Union
- import asyncpg
- from asyncpg import Pool
- from loguru import logger
- WORKER_PROCS = 20
- WORKER_THREADS = 40
- def _proc_initializer():
- proc = current_process()
- logger.info(f"Initialize {proc.name}")
- proc.loop = asyncio.new_event_loop()
- proc.loop_runner = Thread(target=proc.loop.run_forever, daemon=True)
- proc.loop_runner.start()
- logger.info(f"{proc.name} started event loop")
- proc.thread_executor = ThreadPoolExecutor(max_workers=WORKER_THREADS)
- logger.info(f"{proc.name} created pool with {WORKER_THREADS} threads")
- _proc_executor = ProcessPoolExecutor(
- max_workers=WORKER_PROCS, initializer=_proc_initializer
- )
- def _run_in_thread(coro):
- proc = current_process()
- ft = asyncio.run_coroutine_threadsafe(coro(), loop=proc.loop)
- return ft.result()
- def _run_coro(coro, task_conn):
- async def wrapper():
- task_conn.send(True)
- return await coro()
- return wrapper
- def _run_in_process(coro):
- proc = current_process()
- proc_conn, task_conn = Pipe()
- coro = _run_coro(coro, task_conn)
- ft = proc.loop.run_in_executor(proc.thread_executor, _run_in_thread, coro)
- proc_conn.recv()
- def task_done_callback(ft):
- task = {"task_id": str(coro)}
- try:
- task["result"] = ft.result()
- except Exception as e:
- task["exception"] = e
- logger.exception(f"{proc.name} failed processing task {coro}")
- finally:
- task_conn.send(task)
- ft.add_done_callback(task_done_callback)
- return proc_conn
- _loop = asyncio.get_event_loop()
- async def submit(coro, *args, wait_executor=None, **kwargs):
- coro = partial(coro, *args, **kwargs)
- task_pipe = await _loop.run_in_executor(_proc_executor, _run_in_process, coro)
- waiter = wait_executor or ThreadPoolExecutor(max_workers=1)
- task = await _loop.run_in_executor(waiter, task_pipe.recv)
- with contextlib.suppress(KeyError):
- raise task["exception"]
- return task["result"]
- class Database:
- def __init__(self):
- self.pool: Union[Pool, None] = None
- async def create(self):
- pool = await asyncpg.create_pool(
- user='postgres',
- password='postgres',
- host='postgres',
- port=5433,
- database='olxparser_db',
- max_size=4,
- min_size=1,
- )
- self.pool = pool
- async def get_active_tracks_ebay(self):
- sql = """
- SELECT * FROM running_trackers_ebay;
- """
- async with self.pool.acquire() as con:
- return await con.fetch(sql)
- _db = None
- async def get_database():
- global _db
- if _db is None:
- _db = Database()
- await _db.create()
- return _db
- async def run_check_tracker():
- db = await get_database()
- active_tracks = await db.get_active_tracks_ebay()
- print(active_tracks)
- async def main():
- db = Database()
- await db.create()
- asyncio.create_task(submit(run_check_tracker))
- if __name__ == '__main__':
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement