Guest User

Untitled

a guest
Jan 19th, 2018
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.84 KB | None | 0 0
  1. import asyncio
  2. import pytest
  3.  
  4. @pytest.mark.asyncio
  5. async def test_close_asyncgen():
  6.  
  7. loop = asyncio.get_event_loop()
  8.  
  9. async def mygen():
  10. print('mygen: 0-begin')
  11. await asyncio.sleep(0.4)
  12. print('mygen: 1')
  13. yield 1
  14. print('mygen: 2')
  15. await asyncio.sleep(0.4)
  16. print('mygen: 3')
  17. yield 2
  18. print('mygen: 4')
  19. await asyncio.sleep(0.4)
  20. print('mygen: 5-end')
  21.  
  22. g = mygen()
  23.  
  24. async def intr():
  25. nonlocal g
  26. await asyncio.sleep(0.6)
  27. print('intr: aclosing')
  28. await g.aclose() # expected: raise an error since generator is running
  29. print('intr: aclosed')
  30.  
  31. t = loop.create_task(intr())
  32. print('test: begin of async-for')
  33. async for v in g:
  34. print('test: generated', v)
  35. print('test: end of async-for') # not reached???
Add Comment
Please, Sign In to add comment