Skip to content

Instantly share code, notes, and snippets.

@florimondmanca
Created September 8, 2019 18:21
Show Gist options
  • Save florimondmanca/6352a9cbcc670c7677a351e9bf8a987d to your computer and use it in GitHub Desktop.
Save florimondmanca/6352a9cbcc670c7677a351e9bf8a987d to your computer and use it in GitHub Desktop.
Sharing a task ID across coroutines using contextvars (Trio-based)
import trio
import contextvars
LAST_TASK_ID = 0
TASK_ID: contextvars.ContextVar[str] = contextvars.ContextVar("task_id")
async def producer(num_items: int, send_channel: trio.MemorySendChannel) -> None:
task_id = TASK_ID.get()
iden = f"Producer {task_id}"
print(f"{iden} - Started")
async with send_channel:
for k in range(num_items):
await send_channel.send({"task_id": task_id, "message": f"Message {k}"})
await trio.sleep(0.3)
print(f"{iden} - Done")
async def consumer(receive_channel: trio.MemoryReceiveChannel) -> None:
task_id = TASK_ID.get()
iden = f"Consumer #{task_id}"
print(f"Starting {iden}")
async for event in receive_channel:
assert event["task_id"] == task_id
message = event["message"]
print(f"{iden} - Received: {message!r}")
result = await process(message)
print(f"{iden} - Result: {result!r}")
print(f"{iden} - Done")
async def process(message: str) -> str:
task_id = TASK_ID.get()
print(f"Processor {task_id} - Processing message")
return message[::-1]
async def job(nursery):
# Set up the task_id for the current job.
global LAST_TASK_ID
task_id = LAST_TASK_ID + 1
LAST_TASK_ID = task_id
TASK_ID.set(task_id)
# Start other concurrent tasks.
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, 10, send_channel)
nursery.start_soon(consumer, receive_channel)
async def main():
async with trio.open_nursery() as nursery:
for _ in range(3):
nursery.start_soon(job, nursery)
await trio.sleep(1)
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment