Advertisement
Guest User

Untitled

a guest
Oct 16th, 2019
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.80 KB | None | 0 0
  1. class LifespanManager:
  2. def __init__(self, app):
  3. self.app = app
  4. self.startup_complete = asyncio.Event()
  5. self.shutdown_complete = asyncio.Event()
  6. self.messages = [{'type': 'lifespan.startup'}, {'type': 'lifespan.shutdown'}]
  7.  
  8. async def __aenter__(self):
  9. self.task = asyncio.create_task(self.app(self.receive, self.send))
  10. await self.startup_complete.wait()
  11.  
  12. async def __aexit__(self, ...):
  13. await self.shutdown_complete.wait()
  14. await self.task
  15.  
  16. async def receive(self):
  17. return self.messages.pop()
  18.  
  19. async def send(self, message):
  20. if message['type'] == 'lifespan.startup.complete':
  21. self.startup_complete.set()
  22. elif message['type'] == 'lifespan.shutdown.complete':
  23. self.shutdown_complete.set()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement