Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import collections
- from collections.abc import AsyncGenerator
- class Observable(AsyncGenerator):
- def __init__(self, *, loop=None):
- if loop is None:
- self._loop = asyncio.get_event_loop()
- else:
- self._loop = loop
- self._getters = collections.deque()
- async def asend(self, value):
- while self._getters:
- getter = self._getters.popleft()
- getter.set_result(value)
- async def athrow(self, typ, val=None, tb=None):
- while self._getters:
- getter = self._getters.popleft()
- getter.set_exception(typ)
- async def aclose(self):
- await self.athrow(StopAsyncIteration())
- async def __aiter__(self):
- while True:
- getter = self._loop.create_future()
- self._getters.append(getter)
- try:
- yield await getter
- except StopAsyncIteration:
- return
- finally:
- getter.cancel()
- try:
- self._getters.remove(getter)
- except ValueError:
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, typ, val, tb):
- if not typ:
- await self.aclose()
- else:
- await self.athrow(typ, val, tb)
- async def read_msg(obv):
- try:
- async for msg in obv:
- print(msg)
- except RuntimeError as err:
- print(f"Source has crashed: {err}")
- async def generator(obv):
- async with obv:
- await asyncio.sleep(1)
- await obv.asend(1)
- await asyncio.sleep(1)
- await obv.asend(2)
- await asyncio.sleep(1)
- await obv.asend(3)
- await asyncio.sleep(1)
- # raise RuntimeError("Kill me please")
- async def main():
- agen = Observable()
- await asyncio.gather(read_msg(agen), generator(agen))
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
Add Comment
Please, Sign In to add comment