Advertisement
Woobinda

asyncio Queues

Aug 19th, 2017
153
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.90 KB | None | 0 0
  1. import asyncio
  2.  
  3.  
  4. async def consumer(number, queue):
  5.     print('consumer №{}: starting'.format(number))
  6.     while True:
  7.         print('consumer №{}: waiting for item'.format(number))
  8.         item = await queue.get()
  9.         print('consumer №{}: has item {}'.format(number, item))
  10.         if item is None:
  11.             # None is the signal to stop.
  12.             queue.task_done()
  13.             break
  14.         else:
  15.             print('------------')
  16.             await asyncio.sleep(0.1 * item)
  17.             queue.task_done()
  18.     print('consumer №{}: ~~~ending~~~'.format(number))
  19.  
  20.  
  21. async def producer(queue, num_workers):
  22.     print('producer: starting')
  23.     # Add some numbers to the queue to simulate jobs
  24.     for i in range(num_workers * 3):
  25.         await queue.put(i)
  26.         print('producer: added task {} to the queue'.format(i))
  27.     # Add None entries in the queue
  28.     # to signal the consumers to exit
  29.     print('producer: adding stop signals to the queue')
  30.     for i in range(num_workers):
  31.         await queue.put(None)
  32.     print('producer: waiting for queue to empty')
  33.     await queue.join()
  34.     print('producer: ~~~ending~~~')
  35.  
  36.  
  37. async def main(loop, num_consumers):
  38.     # Create the queue with a fixed size so the producer
  39.     # will block until the consumers pull some items out.
  40.     queue = asyncio.Queue(maxsize=num_consumers)
  41.  
  42.     # Scheduled the consumer tasks.
  43.     consumers = [loop.create_task(consumer(i, queue)) for i in range(num_consumers)]
  44.  
  45.     # Schedule the producer task.
  46.     _producer = loop.create_task(producer(queue, num_consumers))
  47.  
  48.     # Wait for all of the coroutines to finish.
  49.     await asyncio.wait(consumers + [_producer])
  50.  
  51.  
  52. def run(num_consumers):
  53.     event_loop = asyncio.get_event_loop()
  54.     try:
  55.         event_loop.run_until_complete(main(event_loop, num_consumers))
  56.     finally:
  57.         event_loop.close()
  58.  
  59.  
  60. if __name__ == '__main__':
  61.     run(2)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement