Advertisement
Guest User

Untitled

a guest
Oct 16th, 2018
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.33 KB | None | 0 0
  1. import pytest
  2.  
  3. from zvk.event.event import EventQueue, event_consumer, Event
  4.  
  5.  
  6. @event_consumer(consumes=['event1'])
  7. async def simple_consumer(counter):
  8. print('simple_consumer')
  9. counter[0] += 1
  10.  
  11.  
  12. @event_consumer(consumes=['event1'])
  13. async def simple_consumer_with_defaults(counter, default_inc=1):
  14. print('simple_consumer_with_defaults')
  15. counter[0] += default_inc
  16.  
  17.  
  18. @event_consumer(consumes=['event1'])
  19. async def simple_consumer_with_overridden_defaults(counter, inc=100):
  20. print('simple_consumer_with_overridden_defaults')
  21. counter[0] += inc
  22.  
  23.  
  24. @event_consumer(consumes=['event1'])
  25. async def consumer_producer(counter):
  26. print('consumer_producer')
  27. counter[0] += 1
  28. yield Event('event2')
  29.  
  30.  
  31. @event_consumer(consumes=['event2'])
  32. async def another_simple_consumer(counter):
  33. print('another_simple_consumer')
  34. counter[0] += 1
  35.  
  36. yield Event('event3')
  37.  
  38.  
  39. @event_consumer(consumes=['event3'])
  40. async def finisher(counter, event_queue):
  41. print('finisher')
  42. counter[0] += 1
  43.  
  44. await event_queue.omae_wa_mou_shindeiru()
  45.  
  46.  
  47. @pytest.mark.asyncio
  48. async def test_event():
  49. event_queue = EventQueue()
  50.  
  51. counter = [0]
  52.  
  53. env = dict(
  54. counter=counter,
  55. event_queue=event_queue,
  56. inc=1)
  57.  
  58. await event_queue.add_event(Event('event1', env))
  59. await event_queue.run()
  60.  
  61. assert counter[0] == 6
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement