Guest User

Untitled

a guest
Jun 18th, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.94 KB | None | 0 0
  1. import asyncio
  2. import collections
  3. from collections.abc import AsyncGenerator
  4.  
  5.  
  6. class Observable(AsyncGenerator):
  7. def __init__(self, *, loop=None):
  8. if loop is None:
  9. self._loop = asyncio.get_event_loop()
  10. else:
  11. self._loop = loop
  12.  
  13. self._getters = collections.deque()
  14.  
  15. async def asend(self, value):
  16. while self._getters:
  17. getter = self._getters.popleft()
  18. getter.set_result(value)
  19.  
  20. async def athrow(self, typ, val=None, tb=None):
  21. while self._getters:
  22. getter = self._getters.popleft()
  23. getter.set_exception(typ)
  24.  
  25. async def aclose(self):
  26. await self.athrow(StopAsyncIteration())
  27.  
  28. async def __aiter__(self):
  29. while True:
  30. getter = self._loop.create_future()
  31. self._getters.append(getter)
  32.  
  33. try:
  34. yield await getter
  35. except StopAsyncIteration:
  36. return
  37. finally:
  38. getter.cancel()
  39. try:
  40. self._getters.remove(getter)
  41. except ValueError:
  42. pass
  43.  
  44.  
  45. async def __aenter__(self):
  46. return self
  47.  
  48. async def __aexit__(self, typ, val, tb):
  49. if not typ:
  50. await self.aclose()
  51. else:
  52. await self.athrow(typ, val, tb)
  53.  
  54.  
  55. async def read_msg(obv):
  56. try:
  57. async for msg in obv:
  58. print(msg)
  59. except RuntimeError as err:
  60. print(f"Source has crashed: {err}")
  61.  
  62.  
  63. async def generator(obv):
  64. async with obv:
  65. await asyncio.sleep(1)
  66. await obv.asend(1)
  67. await asyncio.sleep(1)
  68. await obv.asend(2)
  69. await asyncio.sleep(1)
  70. await obv.asend(3)
  71. await asyncio.sleep(1)
  72. # raise RuntimeError("Kill me please")
  73.  
  74.  
  75. async def main():
  76. agen = Observable()
  77. await asyncio.gather(read_msg(agen), generator(agen))
  78.  
  79.  
  80. loop = asyncio.get_event_loop()
  81. loop.run_until_complete(main())
Add Comment
Please, Sign In to add comment