Last active
March 30, 2025 18:21
-
-
Save ryanhiebert/2f3127aa46511492309e06a8d3b04094 to your computer and use it in GitHub Desktop.
Blocking protocol for multiple locks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import sched_yield | |
from functools import wraps | |
from abc import ABC, abstractmethod | |
def block(fn): | |
@wraps(fn) | |
def wrapper(*args, **kwargs): | |
return Block(fn(*args, **kwargs)) | |
return wrapper | |
@block | |
def blocking_fn(x = 0): | |
with (yield lock1): | |
x += 1 | |
with (yield lock2): | |
with (yield lock3): | |
return x | |
def wait_one(block: Block): | |
gen = block.__block__() | |
acquired = None # First iteration should send nothing | |
while True: | |
try: | |
lock = gen.send(acquired) | |
except StopIteration as stop: | |
return stop.value | |
else: | |
lock.acquire() | |
acquired = proxy_lock(lock) | |
class BlockCancelled(BaseException): ... | |
def wait_first[T: Hashable](blocks: dict[T, Block]) -> tuple[T, Any]: | |
gens = {k: b.__block__() for k, b in blocks.items()} | |
locks = set() | |
try: | |
waiting = {} | |
for k, gen in gens.items(): | |
try: | |
waiting[k] = gen.send(None) | |
except StopIteration as stop: | |
return k, stop.value | |
# This is a spinning lock acquisition. It's very inefficient. | |
while True: | |
for k, lock in waiting.items(): | |
if lock.acquire(blocking=False): | |
try: | |
waiting[k] = gens[k].send(proxy_lock(lock)) | |
except StopIteration as stop: | |
del gens[k] | |
return k, stop.value | |
sched_yield() # The bare minimum to make a spinlock less horrible | |
finally: | |
# Complete all other generators | |
for gen in gens.values(): | |
while True: | |
try: | |
gen.throw(BlockCancelled()) | |
except (StopIteration, BlockCancelled): | |
break | |
return ret | |
class Block(ABC): | |
@abstractmethod | |
def __block__(self): | |
raise NotImplementedError | |
def proxy_lock(lock): | |
try: | |
yield | |
finally: | |
lock.release() | |
one_value = wait_one(blocking_fn()) | |
match wait_first({1: blocking_fn(), 2: blocking_fn()}): | |
case 1, val: | |
first_idx = 1 | |
first_value = val | |
case 2, val: | |
first_idx = 2 | |
first_value = val |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment