Created
September 8, 2019 18:21
-
-
Save florimondmanca/6352a9cbcc670c7677a351e9bf8a987d to your computer and use it in GitHub Desktop.
Sharing a task ID across coroutines using contextvars (Trio-based)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
import contextvars | |
LAST_TASK_ID = 0 | |
TASK_ID: contextvars.ContextVar[str] = contextvars.ContextVar("task_id") | |
async def producer(num_items: int, send_channel: trio.MemorySendChannel) -> None: | |
task_id = TASK_ID.get() | |
iden = f"Producer {task_id}" | |
print(f"{iden} - Started") | |
async with send_channel: | |
for k in range(num_items): | |
await send_channel.send({"task_id": task_id, "message": f"Message {k}"}) | |
await trio.sleep(0.3) | |
print(f"{iden} - Done") | |
async def consumer(receive_channel: trio.MemoryReceiveChannel) -> None: | |
task_id = TASK_ID.get() | |
iden = f"Consumer #{task_id}" | |
print(f"Starting {iden}") | |
async for event in receive_channel: | |
assert event["task_id"] == task_id | |
message = event["message"] | |
print(f"{iden} - Received: {message!r}") | |
result = await process(message) | |
print(f"{iden} - Result: {result!r}") | |
print(f"{iden} - Done") | |
async def process(message: str) -> str: | |
task_id = TASK_ID.get() | |
print(f"Processor {task_id} - Processing message") | |
return message[::-1] | |
async def job(nursery): | |
# Set up the task_id for the current job. | |
global LAST_TASK_ID | |
task_id = LAST_TASK_ID + 1 | |
LAST_TASK_ID = task_id | |
TASK_ID.set(task_id) | |
# Start other concurrent tasks. | |
send_channel, receive_channel = trio.open_memory_channel(0) | |
nursery.start_soon(producer, 10, send_channel) | |
nursery.start_soon(consumer, receive_channel) | |
async def main(): | |
async with trio.open_nursery() as nursery: | |
for _ in range(3): | |
nursery.start_soon(job, nursery) | |
await trio.sleep(1) | |
trio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment