Guest User

Untitled

a guest
Aug 25th, 2020
130
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.50 KB | None | 0 0
  1. import asyncio
  2.  
  3. class AsyncPool:
  4.     def __init__(self,coroutine,no_of_workers,timeout):
  5.         self._loop           = asyncio.get_event_loop()
  6.         self._queue          = asyncio.Queue()
  7.         self._no_of_workers  = no_of_workers
  8.         self._coroutine      = coroutine
  9.         self._timeout        = timeout
  10.         self._workers        = None
  11.  
  12.     async def _worker(self):
  13.         while True:
  14.             try:
  15.                 ret = False
  16.                 queue_item           = await self._queue.get()
  17.                 ret = True
  18.                 result               = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)
  19.             except Exception as e:
  20.                 print(e)
  21.             finally:
  22.                 if ret:
  23.                     self._queue.task_done()
  24.  
  25.  
  26.     async def push_to_queue(self,item):
  27.         self._queue.put_nowait(item)
  28.    
  29.     async def __aenter__(self):
  30.         assert self._workers == None
  31.         self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
  32.         return self
  33.    
  34.     async def __aexit__(self,type,value,traceback):
  35.         await self._queue.join()
  36.  
  37.         for worker in self._workers:
  38.             worker.cancel()
  39.  
  40.         await asyncio.gather(*self._workers, loop=self._loop)
  41.  
  42. async def something(item):
  43.     print("got", item)
  44.     await asyncio.sleep(10)
  45.  
  46. async def main():
  47.     async with AsyncPool(something, 5, 2):
  48.         pass
  49.  
  50. asyncio.run(main())
  51.  
Advertisement
Add Comment
Please, Sign In to add comment