Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import time
- import typing
- async def say(t: int, word):
- await asyncio.sleep(t)
- print(word)
- return word
- async def non_parallel():
- await say(1, 1)
- await say(2, 2)
- async def parallel():
- task1 = asyncio.create_task(say(1, 1))
- task2 = asyncio.create_task(say(2, 2))
- result1 = await task1
- result2 = await task2
- print(result1, result2)
- def run(func: typing.Callable):
- s = time.time()
- asyncio.run(func())
- print(f'{func.__name__}: {(time.time() - s):.2f} sec')
- def event_loop():
- s = time.time()
- loop = asyncio.get_event_loop()
- t1, t2 = loop.run_until_complete(asyncio.gather(say(1, 'task1'),
- say(2, 'task2')))
- print(t1, t2)
- print(f'event_loop: {(time.time() - s):.2f} sec')
- if __name__ == '__main__':
- # Don't run asyncio.get_event_loop() after asyncio.run()
- event_loop()
- run(parallel)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement