Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import trio
- import contextvars
- LAST_TASK_ID = 0
- TASK_ID: contextvars.ContextVar[str] = contextvars.ContextVar("task_id")
- async def producer(num_items: int, send_channel: trio.MemorySendChannel) -> None:
- task_id = TASK_ID.get()
- iden = f"Producer {task_id}"
- print(f"{iden} - Started")
- async with send_channel:
- for k in range(num_items):
- await send_channel.send({"task_id": task_id, "message": f"Message {k}"})
- await trio.sleep(0.3)
- print(f"{iden} - Done")
- async def consumer(receive_channel: trio.MemoryReceiveChannel) -> None:
- task_id = TASK_ID.get()
- iden = f"Consumer #{task_id}"
- print(f"Starting {iden}")
- async for event in receive_channel:
- assert event["task_id"] == task_id
- message = event["message"]
- print(f"{iden} - Received: {message!r}")
- result = await process(message)
- print(f"{iden} - Result: {result!r}")
- print(f"{iden} - Done")
- async def process(message: str) -> str:
- task_id = TASK_ID.get()
- print(f"Processor {task_id} - Processing message")
- return message[::-1]
- async def job(nursery):
- # Set up the task_id for the current job.
- global LAST_TASK_ID
- task_id = LAST_TASK_ID + 1
- LAST_TASK_ID = task_id
- TASK_ID.set(task_id)
- # Start other concurrent tasks.
- send_channel, receive_channel = trio.open_memory_channel(0)
- nursery.start_soon(producer, 10, send_channel)
- nursery.start_soon(consumer, receive_channel)
- async def main():
- async with trio.open_nursery() as nursery:
- for _ in range(3):
- nursery.start_soon(job, nursery)
- await trio.sleep(1)
- trio.run(main)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement