Created
March 7, 2021 13:54
-
-
Save lanfon72/3236e41b198e8f6ad405601ab517e2e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import asyncio | |
import contextvars | |
from random import random | |
from threading import current_thread | |
from concurrent.futures import ThreadPoolExecutor | |
# declare context var | |
request_id = contextvars.ContextVar('Id of request.') | |
def run_in_thread(req_id): | |
time.sleep(random() * 5) | |
if request_id.get(None): | |
print(f"\t\tinput={req_id} \tprev={request_id.get()} in {current_thread().name}") | |
request_id.set(req_id) | |
print(f"\t\t{req_id} setted in {current_thread().name}") | |
return req_id | |
async def some_inner_coroutine(myid): | |
# get value | |
await asyncio.sleep(random() * 5) | |
prev, curr = myid, request_id.get() | |
if prev != curr: | |
print(f"ERROR: {prev} != {curr}") | |
else: | |
request_id.set(myid * 10) | |
return curr | |
async def some_outer_coroutine(req_id): | |
# set value | |
request_id.set(req_id) | |
fut = asyncio.ensure_future(some_inner_coroutine(req_id)) | |
request_id.set(req_id * 100) | |
inner_val = await fut | |
outer_changed = request_id.get() | |
print(f"{inner_val=} \t{req_id=} \t{outer_changed=}") | |
return req_id | |
async def some_outer_with_await(req_id): | |
request_id.set(req_id) | |
await some_inner_coroutine(req_id) | |
inner_changed = request_id.get() | |
print(f"\t\t\t==> origin {req_id=} \t {inner_changed=}") | |
async def main(): | |
tasks = [] | |
asyncio.get_running_loop().set_default_executor(ThreadPoolExecutor(2)) | |
print(f"--- globally request_id={request_id.get(None)}") | |
for req_id in range(1, 30): | |
if req_id % 3: | |
tasks.append(asyncio.ensure_future(some_outer_with_await(req_id))) | |
elif req_id % 6 == 0: | |
tasks.append(asyncio.get_running_loop().run_in_executor(None, run_in_thread, req_id)) | |
else: | |
tasks.append(asyncio.ensure_future(some_outer_coroutine(req_id))) | |
await asyncio.gather(*tasks) | |
print(f"--- globally request_id={request_id.get(None)}") | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment