Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class LifespanManager:
- def __init__(self, app):
- self.app = app
- self.startup_complete = asyncio.Event()
- self.shutdown_complete = asyncio.Event()
- self.messages = [{'type': 'lifespan.startup'}, {'type': 'lifespan.shutdown'}]
- async def __aenter__(self):
- self.task = asyncio.create_task(self.app(self.receive, self.send))
- await self.startup_complete.wait()
- async def __aexit__(self, ...):
- await self.shutdown_complete.wait()
- await self.task
- async def receive(self):
- return self.messages.pop()
- async def send(self, message):
- if message['type'] == 'lifespan.startup.complete':
- self.startup_complete.set()
- elif message['type'] == 'lifespan.shutdown.complete':
- self.shutdown_complete.set()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement