-
-
Save Eboubaker/6a0b807788088a764b2a4c156fda0e4b to your computer and use it in GitHub Desktop.
import threading | |
from typing import List | |
class ReentrantRWLock: | |
""" | |
A lock object that allows many simultaneous "read locks", but only one "write lock." | |
it also allows multiple write locks from the same thread | |
""" | |
def __init__(self): | |
self._writer = None # current writer | |
self._readers: List[int] = [] # list of unique readers | |
self._read_ready = threading.Condition(threading.Lock()) | |
self._with_ops_write = [] # stack for 'with' keyword for write or read operations, 0 for read 1 for write | |
self._ops_arr_lock = threading.Lock() # lock for previous list | |
def acquire_read(self): | |
""" | |
Acquire a read lock. Blocks only if a another thread has acquired the write lock. | |
""" | |
ident = threading.current_thread().ident | |
if self._writer == ident or ident in self._readers: | |
return | |
with self._read_ready: | |
self._readers.append(ident) | |
def release_read(self): | |
""" | |
Release a read lock if exists from this thread | |
""" | |
ident = threading.current_thread().ident | |
if self._writer == ident or ident not in self._readers: | |
return | |
with self._read_ready: | |
self._readers.remove(ident) | |
if len(self._readers) == 0: | |
self._read_ready.notifyAll() | |
def acquire_write(self): | |
""" | |
Acquire a write lock. Blocks until there are no acquired read or write locks from another thread. | |
""" | |
ident = threading.current_thread().ident | |
if self._writer == ident: | |
return | |
self._read_ready.acquire() | |
me_included = 1 if ident in self._readers else 0 | |
while len(self._readers) - me_included > 0: | |
self._read_ready.wait() | |
self._writer = ident | |
def release_write(self): | |
""" | |
Release a write lock if exists from this thread. | |
""" | |
if not self._writer or not self._writer == threading.current_thread().ident: | |
return | |
self._writer = None | |
self._read_ready.release() | |
def __enter__(self): | |
with self._ops_arr_lock: | |
if len(self._with_ops_write) == 0: | |
raise RuntimeError("ReentrantRWLock: used 'with' block without call to for_read or for_write") | |
write = self._with_ops_write[-1] | |
if write: | |
self.acquire_write() | |
else: | |
self.acquire_read() | |
def __exit__(self, exc_type, exc_value, tb): | |
with self._ops_arr_lock: | |
write = self._with_ops_write.pop() | |
if write: | |
self.release_write() | |
else: | |
self.release_read() | |
if exc_type is not None: | |
return False # exception happened | |
return True | |
def for_read(self): | |
""" | |
used for 'with' block | |
""" | |
with self._ops_arr_lock: | |
self._with_ops_write.append(0) | |
return self | |
def for_write(self): | |
""" | |
used for 'with' block | |
""" | |
with self._ops_arr_lock: | |
self._with_ops_write.append(1) | |
return self |
@telecran-telecrit I don't see the problem. give the code snippet that makes a deadlock,
Hi there,
in search of a ReentrantRWLock I found your code.
I was just writing tests for the class when I found this, which seemingly deadlocks:
from utils import ReentrantRWLock # your lock impl.
import time
from threading import Thread
import pytest # pip install pytest pytest-timeout
TIMEOUT = 5 # should be at least 3 * SLEEP_TIME
# Some tests use sleeps to simulate possible race conditions
# Define how long this should be - test precision is dependent on this!
SLEEP_TIME = 0.5
@pytest.mark.timeout(TIMEOUT)
def test_multi_threaded_read_write_exclusive():
lock = ReentrantRWLock()
def read():
print("Before read aquire")
with lock.for_read():
print("Before read sleep")
time.sleep(SLEEP_TIME)
print("read done")
return
def write():
print("Before write aquire")
with lock.for_write():
print("Before write sleep")
time.sleep(SLEEP_TIME)
print("write done")
return
t1 = Thread(name="write", target=write, daemon=True)
t2 = Thread(name="read", target=read, daemon=True)
# this different order also deadlocks
# t1 = Thread(name="read", target=read, daemon=True)
# t2 = Thread(name="write", target=write, daemon=True)
start = time.perf_counter()
t1.start()
time.sleep(0.01)
t2.start()
t1.join()
t2.join()
end = time.perf_counter()
# definitly at least 2 * SLEEP_TIME!
assert (
end - start > 1.9 * SLEEP_TIME
), f"Time for both joins was {end - start}, should be > {1.9 * SLEEP_TIME}"
The idea of the test case was to measure the time it takes for both threads to join back into the main thread.
If both sleeps happen at the same time (which would be incorrect here), then end - start
should be only slightly higher than SLEEP_TIME
.
However, I run into the 5 seconds timeout here and after taking a look, I believe that the read and write lock switched roles?
When the write lock (which goes first) returns from the __exit__
function, it pops the last element of the _with_ops_write
list, which is a 0
, leaving a 1
from itself.
In fact, I am not sure how you can be sure to always access the correct element in _with_ops_write
as the order the locks could be released in might change.
Alternatively, maybe I missed something? Could you maybe take a look at this?
Note, this test case works as intended when calling aquire_read/write and release_read/write directly instead of using context managers.
Thanks a lot
I forked your repo and tried my hand at a re-implementation using context managers to fix the problem of "not knowing if we are writing or not".
This at least seems to fix my test case above.
Also, it has the advantage of not allowing anyone to enter the context without explicitly calling either the for_read
or for_write
methods.
Update:
I also found with the original code that the reentrant part of the lock is brocken.
If you aquire the same lock multiple times and then release once, then thread will no longer hold the lock even though it should still hold it (multiple times). I have a counting/fixed version here .
Your
ident = threading.current_thread().ident, self._writer == ident
check produces dead-lock within HttpGetHandler::do_GET in case of readers/writers from different threads, even for simpleclass ThreadingHTTPServer(ThreadingMixIn, HTTPServer)