Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- class AsyncPool:
- def __init__(self,coroutine,no_of_workers,timeout):
- self._loop = asyncio.get_event_loop()
- self._queue = asyncio.Queue()
- self._no_of_workers = no_of_workers
- self._coroutine = coroutine
- self._timeout = timeout
- self._workers = None
- async def _worker(self):
- while True:
- try:
- ret = False
- queue_item = await self._queue.get()
- ret = True
- result = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)
- except Exception as e:
- print(e)
- finally:
- if ret:
- self._queue.task_done()
- async def push_to_queue(self,item):
- self._queue.put_nowait(item)
- async def __aenter__(self):
- assert self._workers == None
- self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
- return self
- async def __aexit__(self,type,value,traceback):
- await self._queue.join()
- for worker in self._workers:
- worker.cancel()
- await asyncio.gather(*self._workers, loop=self._loop)
- async def something(item):
- print("got", item)
- await asyncio.sleep(10)
- async def main():
- async with AsyncPool(something, 5, 2):
- pass
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment