Skip to content

Instantly share code, notes, and snippets.

@molguin92
Last active June 27, 2018 09:55
Show Gist options
  • Save molguin92/41a5b59f9ae73751738c8e377fad1c85 to your computer and use it in GitHub Desktop.
Save molguin92/41a5b59f9ae73751738c8e377fad1c85 to your computer and use it in GitHub Desktop.
Python utility functions and classes (3.5+!)
import threading
class ReadWriteLock:
"""
A lock object that allows many simultaneous "read locks", but
only one "write lock."
Modified from
https://www.safaribooksonline.com/library/view/python-cookbook/0596001673/ch06s04.html
"""
class Lock:
def __init__(self, acquire_fn, release_fn):
self._enter_fn = acquire_fn
self._exit_fn = release_fn
def __enter__(self):
self._enter_fn()
return None
def __exit__(self, exc_type, exc_val, exc_tb):
self._exit_fn()
return False
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
# convenience objects
self.read_lock = ReadWriteLock.Lock(self.acquire_read,
self.release_read)
self.write_lock = ReadWriteLock.Lock(self.acquire_write,
self.release_write)
def acquire_read(self):
""" Acquire a read lock. Blocks only if a thread has
acquired the write lock. """
self._read_ready.acquire()
try:
self._readers += 1
finally:
self._read_ready.release()
def release_read(self):
""" Release a read lock. """
self._read_ready.acquire()
try:
self._readers -= 1
if not self._readers:
self._read_ready.notifyAll()
finally:
self._read_ready.release()
def acquire_write(self):
""" Acquire a write lock. Blocks until there are no
acquired read or write locks. """
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
""" Release a write lock. """
self._read_ready.release()
class RecursiveNestedDict(dict):
_PATH_SEP = '.'
def __init__(self, *args, **kwargs):
super(RecursiveNestedDict, self).__init__(*args, **kwargs)
for k, v in self.items():
if isinstance(v, dict) and not isinstance(v, RecursiveNestedDict):
self[k] = RecursiveNestedDict(v)
def __setitem__(self, key, value):
if isinstance(value, dict) and \
not isinstance(value, RecursiveNestedDict):
value = RecursiveNestedDict(value)
super(RecursiveNestedDict, self).__setitem__(key, value)
def __repr__(self):
return self.__class__.__name__ + '(' \
+ super(RecursiveNestedDict, self).__repr__() + ')'
def __str__(self):
return str(self.asdict())
def __recursive_find(self, path: str, _previous_path: str = None):
keys = path.split(self._PATH_SEP, maxsplit=1)
current_path = keys[0] if not _previous_path \
else self._PATH_SEP.join((_previous_path, keys[0]))
if len(keys) == 1:
# final split
try:
return self[keys[0]]
except KeyError as e:
raise KeyError(current_path) from e
else:
try:
next_level = self[keys[0]]
assert isinstance(next_level, RecursiveNestedDict)
return next_level.__recursive_find(keys[1], current_path)
except AssertionError:
raise TypeError(
'Can\'t traverse dictionary: '
'element {} is not a {}'.format(
current_path,
self.__class__.__name__
))
def asdict(self) -> Dict:
"""
Returns the vanilla python representation of this dictionary.
:return: Dictionary representation of the data contained in this
instance.
"""
r = dict(self)
for k, v in r.items():
if isinstance(v, RecursiveNestedDict):
r[k] = v.asdict()
return r
def find(self, path: str):
"""
Recursively finds the specified "path" through nested dictionaries,
with origin in the current dictionary.
Paths are specified as a sequence of keys separated by periods.
For instance, in the following dictionary, key "c" in the innermost
nested dictionary has path "a.b.c" from the outermost dictionary:
{
"a": {
"b": {
"c": "Hello World!"
}
}
}
:param path: String representation of the path to find.
:return: The value at located at the end of the given path.
:raises: KeyError in case the path cannot be followed.
"""
return self.__recursive_find(path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment