Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- async def consumer(number, queue):
- print('consumer №{}: starting'.format(number))
- while True:
- print('consumer №{}: waiting for item'.format(number))
- item = await queue.get()
- print('consumer №{}: has item {}'.format(number, item))
- if item is None:
- # None is the signal to stop.
- queue.task_done()
- break
- else:
- print('------------')
- await asyncio.sleep(0.1 * item)
- queue.task_done()
- print('consumer №{}: ~~~ending~~~'.format(number))
- async def producer(queue, num_workers):
- print('producer: starting')
- # Add some numbers to the queue to simulate jobs
- for i in range(num_workers * 3):
- await queue.put(i)
- print('producer: added task {} to the queue'.format(i))
- # Add None entries in the queue
- # to signal the consumers to exit
- print('producer: adding stop signals to the queue')
- for i in range(num_workers):
- await queue.put(None)
- print('producer: waiting for queue to empty')
- await queue.join()
- print('producer: ~~~ending~~~')
- async def main(loop, num_consumers):
- # Create the queue with a fixed size so the producer
- # will block until the consumers pull some items out.
- queue = asyncio.Queue(maxsize=num_consumers)
- # Scheduled the consumer tasks.
- consumers = [loop.create_task(consumer(i, queue)) for i in range(num_consumers)]
- # Schedule the producer task.
- _producer = loop.create_task(producer(queue, num_consumers))
- # Wait for all of the coroutines to finish.
- await asyncio.wait(consumers + [_producer])
- def run(num_consumers):
- event_loop = asyncio.get_event_loop()
- try:
- event_loop.run_until_complete(main(event_loop, num_consumers))
- finally:
- event_loop.close()
- if __name__ == '__main__':
- run(2)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement