Created
April 19, 2022 19:21
-
-
Save denizetkar/46e6d449fdbee5e32c8335501ed93d38 to your computer and use it in GitHub Desktop.
Implementation of a broadcast condition synchronization primitive using a read-write lock.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import threading | |
| from collections import deque | |
| from typing import List, Optional | |
| class _LightSwitch: | |
| """An auxiliary "light switch"-like object. The first thread turns on the | |
| "switch", the last one turns it off (see [1, sec. 4.2.2] for details).""" | |
| def __init__(self): | |
| self.__counter = 0 | |
| self.__mutex = threading.Lock() | |
| def acquire(self, lock: threading.Lock): | |
| with self.__mutex: | |
| self.__counter += 1 | |
| if self.__counter == 1: | |
| lock.acquire() | |
| def release(self, lock: threading.Lock): | |
| with self.__mutex: | |
| self.__counter -= 1 | |
| if self.__counter == 0: | |
| lock.release() | |
| class RWLockCtxMgr: | |
| def __init__(self, rw_lock: "RWLock", is_reader: bool = True): | |
| self.is_reader = is_reader | |
| self.rw_lock = rw_lock | |
| def __enter__(self): | |
| if self.is_reader: | |
| self.rw_lock.reader_acquire() | |
| else: | |
| self.rw_lock.writer_acquire() | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self.is_reader: | |
| self.rw_lock.reader_release() | |
| else: | |
| self.rw_lock.writer_release() | |
| class RWLock: | |
| """Synchronization object used in a solution of so-called second | |
| readers-writers problem. In this problem, many readers can simultaneously | |
| access a share, and a writer has an exclusive access to this share. | |
| Additionally, the following constraints should be met: | |
| 1) no reader should be kept waiting if the share is currently opened for | |
| reading unless a writer is also waiting for the share, | |
| 2) no writer should be kept waiting for the share longer than absolutely | |
| necessary. | |
| The implementation is based on [1, secs. 4.2.2, 4.2.6, 4.2.7] | |
| with a modification -- adding an additional lock (C{self.__readers_queue}) | |
| -- in accordance with [2]. | |
| Sources: | |
| [1] A.B. Downey: "The little book of semaphores", Version 2.1.5, 2008 | |
| [2] P.J. Courtois, F. Heymans, D.L. Parnas: | |
| "Concurrent Control with 'Readers' and 'Writers'", | |
| Communications of the ACM, 1971 (via [3]) | |
| [3] http://en.wikipedia.org/wiki/Readers-writers_problem | |
| """ | |
| def __init__(self): | |
| self.__read_switch = _LightSwitch() | |
| self.__write_switch = _LightSwitch() | |
| self.__no_readers = threading.Lock() | |
| self.__no_writers = threading.Lock() | |
| # A lock giving an even higher priority to the writer in certain cases (see [2] for a discussion) | |
| self.__readers_queue = threading.Lock() | |
| def reader_acquire(self): | |
| with self.__readers_queue: | |
| with self.__no_readers: | |
| self.__read_switch.acquire(self.__no_writers) | |
| def reader_release(self): | |
| self.__read_switch.release(self.__no_writers) | |
| def writer_acquire(self): | |
| self.__write_switch.acquire(self.__no_readers) | |
| self.__no_writers.acquire() | |
| def writer_release(self): | |
| self.__no_writers.release() | |
| self.__write_switch.release(self.__no_readers) | |
| def __call__(self, is_reader: bool) -> RWLockCtxMgr: | |
| """A convenience function to create a context manager for a reader or a writer.""" | |
| return RWLockCtxMgr(self, is_reader=is_reader) | |
| def __repr__(self) -> str: | |
| return "RWLock()" | |
| class BroadcastConditionWaitError(Exception): | |
| pass | |
| class BroadcastConditionCtxMgr: | |
| def __init__(self, brd_cond: "BroadcastCondition", is_reader: bool = True, timeout: Optional[float] = None): | |
| self.brd_cond = brd_cond | |
| self.is_reader = is_reader | |
| self.timeout = timeout | |
| self._rw_lock_ctx_mgr = brd_cond._rw_lock(is_reader=is_reader) | |
| def __enter__(self): | |
| if self.is_reader: | |
| gotit = self.brd_cond._wait(self.timeout) | |
| if not gotit: | |
| raise BroadcastConditionWaitError() | |
| return self._rw_lock_ctx_mgr.__enter__() | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self.is_reader and isinstance(exc_val, BroadcastConditionWaitError): | |
| return | |
| return self._rw_lock_ctx_mgr.__exit__(exc_type, exc_val, exc_tb) | |
| class BroadcastCondition: | |
| """Class that implements a broadcast condition variable. | |
| A condition variable allows one or more threads to wait until they are notified | |
| by another thread. This implementation differs in the fact that multiple readers | |
| calling the `wait()` method can enter the critical section at the same time after | |
| being notified by some writer calling the `notify_all()` method. But only a single | |
| writer can enter the critical section to notify all readers! | |
| Example: | |
| - In the writer thread: | |
| ``` | |
| with brd_cond(is_reader=False): | |
| # You have the write lock, now write and then notify all. | |
| shared_var = ... | |
| brd_cond.notify_all() | |
| ``` | |
| - In the reader threads: | |
| ``` | |
| with brd_cond(is_reader=True): | |
| # You have been notified and you have a read lock, now read. | |
| print(shared_var) | |
| ``` | |
| """ | |
| def __init__(self): | |
| self._rw_lock = RWLock() | |
| # Create a FIFO queue for waiters (readers) | |
| self._waiters_lock = threading.Lock() | |
| self._waiters: List[threading.Lock] = deque() | |
| def __call__(self, is_reader: bool, timeout: Optional[float] = None) -> BroadcastConditionCtxMgr: | |
| """A convenience function to create a context manager for a reader or a writer. | |
| NOTE: If `is_reader` is true and our `_wait()` method returns false in the context manager's | |
| `__enter__()` method, for reasons such as timeout, then a `BroadcastConditionWaitError` is | |
| raised! | |
| """ | |
| return BroadcastConditionCtxMgr(self, is_reader=is_reader, timeout=timeout) | |
| def __repr__(self) -> str: | |
| return "BroadcastCondition({}, {})".format(self._rw_lock, len(self._waiters)) | |
| def _wait(self, timeout=None) -> bool: | |
| """Wait until notified or until a timeout occurs. | |
| This method blocks until it is awakened by a notify_all() call for the | |
| same broadcast condition variable in another thread, or until the optional | |
| timeout occurs. | |
| When the timeout argument is present and not None, it should be a | |
| floating point number specifying a timeout for the operation in seconds | |
| (or fractions thereof). | |
| """ | |
| waiter = threading.Lock() | |
| waiter.acquire() | |
| with self._waiters_lock: | |
| self._waiters.append(waiter) | |
| gotit = False | |
| try: | |
| if timeout is None: | |
| waiter.acquire() | |
| gotit = True | |
| else: | |
| if timeout > 0: | |
| gotit = waiter.acquire(True, timeout) | |
| else: | |
| gotit = waiter.acquire(False) | |
| return gotit | |
| finally: | |
| if not gotit: | |
| try: | |
| with self._waiters_lock: | |
| self._waiters.remove(waiter) | |
| except ValueError: | |
| pass | |
| def notify_all(self): | |
| """Wake up all threads (readers) waiting on this condition.""" | |
| with self._waiters_lock: | |
| if not self._waiters: | |
| return | |
| for waiter in self._waiters: | |
| waiter.release() | |
| self._waiters.clear() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment