Last active
August 4, 2021 00:54
-
-
Save fedej/8957769ff3db3b377d06f4bb74bcef77 to your computer and use it in GitHub Desktop.
Context var isolation test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
from contextvars import ContextVar | |
from typing import Optional | |
TOKEN: ContextVar[Optional[str]] = ContextVar("token", default=None) | |
class Lock: | |
def __init__(self, name): | |
self.name = name | |
TOKEN.set(None) | |
self.expired = asyncio.Event() | |
async def acquire(self, token: Optional[str] = None, acquire: bool = False): | |
loop = asyncio.get_event_loop() | |
stop_trying_at = loop.time() + 10 | |
task = asyncio.current_task() | |
while True: | |
if self.expired.is_set() or acquire: | |
print(f"{time.time()}: {task.get_name()} acquired lock {self.name}") | |
print(f"{time.time()}: Token was {TOKEN.get()} in {task.get_name()}") | |
TOKEN.set(token) | |
print(f"{time.time()}: {task.get_name()} sets token to {TOKEN.get()}") | |
return True | |
next_try_at = loop.time() + 1 | |
if stop_trying_at is not None and next_try_at > stop_trying_at: | |
return False | |
print(f"{time.time()}: {task.get_name()} blocks trying to acquire {self.name}") | |
await asyncio.sleep(1) | |
def release(self) -> bool: | |
task = asyncio.current_task() | |
expected_token = TOKEN.get() | |
if expected_token is None: | |
raise RuntimeError("Cannot release an unlocked lock") | |
print(f"{time.time()}: {task.get_name()} releasing {expected_token}") | |
TOKEN.set(None) | |
return self.expired.is_set() | |
async def do_with_lock_task2(lock: Lock): | |
task = asyncio.current_task() | |
task3 = asyncio.create_task(do_with_lock_task3(lock)) | |
if await lock.acquire("abc", True): | |
await asyncio.sleep(1) | |
lock.expired.set() | |
print(f"{time.time()}: {task.get_name()} has not completed yet, redis expires lock key") | |
await asyncio.sleep(2) | |
lock.release() | |
else: | |
raise RuntimeError(f"{task.get_name()} error") | |
await asyncio.gather(task3) | |
async def do_with_lock_task3(lock: Lock): | |
task = asyncio.current_task() | |
await lock.expired.wait() | |
if await lock.acquire("xyz"): | |
await asyncio.sleep(5) | |
lock.release() | |
else: | |
raise RuntimeError(f"{task.get_name()} error") | |
async def main(): | |
### OK ### | |
lock = Lock("my-lock") | |
task2 = asyncio.create_task(do_with_lock_task2(lock)) | |
await asyncio.gather(task2) | |
if __name__ == "__main__": | |
asyncio.run(main()) | |
""" | |
1628038392.1417608: Task-2 acquired lock my-lock | |
1628038392.1417918: Token was None in Task-2 | |
1628038392.1418037: Task-2 sets token to abc | |
1628038393.1431952: Task-2 has not completed yet, redis expires lock key | |
1628038393.1433778: Task-3 acquired lock my-lock | |
1628038393.1434047: Token was None in Task-3 < Context from task 2 wasn't copied to inner task | |
1628038393.14343: Task-3 sets token to xyz | |
1628038395.1458466: Task-2 releasing abc | |
1628038398.147317: Task-3 releasing xyz | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment