Skip to content

Instantly share code, notes, and snippets.

@denizetkar
Created April 19, 2022 19:21
Show Gist options
  • Select an option

  • Save denizetkar/46e6d449fdbee5e32c8335501ed93d38 to your computer and use it in GitHub Desktop.

Select an option

Save denizetkar/46e6d449fdbee5e32c8335501ed93d38 to your computer and use it in GitHub Desktop.
Implementation of a broadcast condition synchronization primitive using a read-write lock.
import threading
from collections import deque
from typing import List, Optional
class _LightSwitch:
"""An auxiliary "light switch"-like object. The first thread turns on the
"switch", the last one turns it off (see [1, sec. 4.2.2] for details)."""
def __init__(self):
self.__counter = 0
self.__mutex = threading.Lock()
def acquire(self, lock: threading.Lock):
with self.__mutex:
self.__counter += 1
if self.__counter == 1:
lock.acquire()
def release(self, lock: threading.Lock):
with self.__mutex:
self.__counter -= 1
if self.__counter == 0:
lock.release()
class RWLockCtxMgr:
def __init__(self, rw_lock: "RWLock", is_reader: bool = True):
self.is_reader = is_reader
self.rw_lock = rw_lock
def __enter__(self):
if self.is_reader:
self.rw_lock.reader_acquire()
else:
self.rw_lock.writer_acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.is_reader:
self.rw_lock.reader_release()
else:
self.rw_lock.writer_release()
class RWLock:
"""Synchronization object used in a solution of so-called second
readers-writers problem. In this problem, many readers can simultaneously
access a share, and a writer has an exclusive access to this share.
Additionally, the following constraints should be met:
1) no reader should be kept waiting if the share is currently opened for
reading unless a writer is also waiting for the share,
2) no writer should be kept waiting for the share longer than absolutely
necessary.
The implementation is based on [1, secs. 4.2.2, 4.2.6, 4.2.7]
with a modification -- adding an additional lock (C{self.__readers_queue})
-- in accordance with [2].
Sources:
[1] A.B. Downey: "The little book of semaphores", Version 2.1.5, 2008
[2] P.J. Courtois, F. Heymans, D.L. Parnas:
"Concurrent Control with 'Readers' and 'Writers'",
Communications of the ACM, 1971 (via [3])
[3] http://en.wikipedia.org/wiki/Readers-writers_problem
"""
def __init__(self):
self.__read_switch = _LightSwitch()
self.__write_switch = _LightSwitch()
self.__no_readers = threading.Lock()
self.__no_writers = threading.Lock()
# A lock giving an even higher priority to the writer in certain cases (see [2] for a discussion)
self.__readers_queue = threading.Lock()
def reader_acquire(self):
with self.__readers_queue:
with self.__no_readers:
self.__read_switch.acquire(self.__no_writers)
def reader_release(self):
self.__read_switch.release(self.__no_writers)
def writer_acquire(self):
self.__write_switch.acquire(self.__no_readers)
self.__no_writers.acquire()
def writer_release(self):
self.__no_writers.release()
self.__write_switch.release(self.__no_readers)
def __call__(self, is_reader: bool) -> RWLockCtxMgr:
"""A convenience function to create a context manager for a reader or a writer."""
return RWLockCtxMgr(self, is_reader=is_reader)
def __repr__(self) -> str:
return "RWLock()"
class BroadcastConditionWaitError(Exception):
pass
class BroadcastConditionCtxMgr:
def __init__(self, brd_cond: "BroadcastCondition", is_reader: bool = True, timeout: Optional[float] = None):
self.brd_cond = brd_cond
self.is_reader = is_reader
self.timeout = timeout
self._rw_lock_ctx_mgr = brd_cond._rw_lock(is_reader=is_reader)
def __enter__(self):
if self.is_reader:
gotit = self.brd_cond._wait(self.timeout)
if not gotit:
raise BroadcastConditionWaitError()
return self._rw_lock_ctx_mgr.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.is_reader and isinstance(exc_val, BroadcastConditionWaitError):
return
return self._rw_lock_ctx_mgr.__exit__(exc_type, exc_val, exc_tb)
class BroadcastCondition:
"""Class that implements a broadcast condition variable.
A condition variable allows one or more threads to wait until they are notified
by another thread. This implementation differs in the fact that multiple readers
calling the `wait()` method can enter the critical section at the same time after
being notified by some writer calling the `notify_all()` method. But only a single
writer can enter the critical section to notify all readers!
Example:
- In the writer thread:
```
with brd_cond(is_reader=False):
# You have the write lock, now write and then notify all.
shared_var = ...
brd_cond.notify_all()
```
- In the reader threads:
```
with brd_cond(is_reader=True):
# You have been notified and you have a read lock, now read.
print(shared_var)
```
"""
def __init__(self):
self._rw_lock = RWLock()
# Create a FIFO queue for waiters (readers)
self._waiters_lock = threading.Lock()
self._waiters: List[threading.Lock] = deque()
def __call__(self, is_reader: bool, timeout: Optional[float] = None) -> BroadcastConditionCtxMgr:
"""A convenience function to create a context manager for a reader or a writer.
NOTE: If `is_reader` is true and our `_wait()` method returns false in the context manager's
`__enter__()` method, for reasons such as timeout, then a `BroadcastConditionWaitError` is
raised!
"""
return BroadcastConditionCtxMgr(self, is_reader=is_reader, timeout=timeout)
def __repr__(self) -> str:
return "BroadcastCondition({}, {})".format(self._rw_lock, len(self._waiters))
def _wait(self, timeout=None) -> bool:
"""Wait until notified or until a timeout occurs.
This method blocks until it is awakened by a notify_all() call for the
same broadcast condition variable in another thread, or until the optional
timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).
"""
waiter = threading.Lock()
waiter.acquire()
with self._waiters_lock:
self._waiters.append(waiter)
gotit = False
try:
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
if not gotit:
try:
with self._waiters_lock:
self._waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
"""Wake up all threads (readers) waiting on this condition."""
with self._waiters_lock:
if not self._waiters:
return
for waiter in self._waiters:
waiter.release()
self._waiters.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment