Advertisement
ikseek

Coroutines example

Jun 13th, 2018
167
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.15 KB | None | 0 0
  1. import asyncio
  2. import datetime
  3. import json
  4.  
  5. loop = asyncio.get_event_loop()
  6. coro_sched = []
  7.  
  8. async def print_hello():
  9.     for i in range(10):
  10.         coro_sched.append('hello')
  11.         print("Hello")
  12.         await asyncio.sleep(0.2)
  13.  
  14.  
  15. async def print_time():
  16.     for i in range(10):
  17.         coro_sched.append('time')
  18.         print(datetime.datetime.now())
  19.         await asyncio.sleep(0.1)
  20.  
  21.  
  22. async def print_random_jokes():
  23.     for i in range(10):
  24.         coro_sched.append('joke')
  25.         reader, writer = await asyncio.open_connection('api.icndb.com', 80)
  26.         writer.write(b'GET /jokes/random HTTP/1.1\r\nHost: api.icndb.com\r\nConnection: Close\r\n\r\n')
  27.         response = await reader.read()
  28.         json_str = response.decode('utf-8').split('\r\n')[-4]
  29.         print(json.loads(json_str)['value']['joke'])
  30.         await asyncio.sleep(0.3)
  31.  
  32.  
  33.  
  34. async def print_stuff():
  35.     j = print_random_jokes()
  36.     h = print_hello()
  37.     t = print_time()
  38.     j_task = loop.create_task(j) # Start printing jokes
  39.     await t # Start printing time and wait until it's finished
  40.     await h # Start printing hello and wait until it's finished
  41.     await j_task # Wait until printing jokes finished
  42.     print(coro_sched)
  43.  
  44.  
  45. loop.run_until_complete(print_stuff())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement