Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pytest
- from zvk.event.event import EventQueue, event_consumer, Event
- @event_consumer(consumes=['event1'])
- async def simple_consumer(counter):
- print('simple_consumer')
- counter[0] += 1
- @event_consumer(consumes=['event1'])
- async def simple_consumer_with_defaults(counter, default_inc=1):
- print('simple_consumer_with_defaults')
- counter[0] += default_inc
- @event_consumer(consumes=['event1'])
- async def simple_consumer_with_overridden_defaults(counter, inc=100):
- print('simple_consumer_with_overridden_defaults')
- counter[0] += inc
- @event_consumer(consumes=['event1'])
- async def consumer_producer(counter):
- print('consumer_producer')
- counter[0] += 1
- yield Event('event2')
- @event_consumer(consumes=['event2'])
- async def another_simple_consumer(counter):
- print('another_simple_consumer')
- counter[0] += 1
- yield Event('event3')
- @event_consumer(consumes=['event3'])
- async def finisher(counter, event_queue):
- print('finisher')
- counter[0] += 1
- await event_queue.omae_wa_mou_shindeiru()
- @pytest.mark.asyncio
- async def test_event():
- event_queue = EventQueue()
- counter = [0]
- env = dict(
- counter=counter,
- event_queue=event_queue,
- inc=1)
- await event_queue.add_event(Event('event1', env))
- await event_queue.run()
- assert counter[0] == 6
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement