Advertisement
ivandev

Untitled

Jul 3rd, 2022
1,110
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 0.64 KB | None | 0 0
  1. def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
  2.     tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
  3.     if not tasks:
  4.         return
  5.  
  6.     for task in tasks:
  7.         task.cancel()
  8.     loop.run_until_complete(asyncio.gather(*tasks, loop=loop, return_exceptions=True))
  9.  
  10.     for task in tasks:
  11.         if not task.cancelled() and task.exception() is not None:
  12.             loop.call_exception_handler(
  13.                 {
  14.                     "message": "unhandled exception during shutdown",
  15.                     "exception": task.exception(),
  16.                     "task": task,
  17.                 }
  18.             )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement