Created
December 21, 2015 07:36
-
-
Save solidsnack/7848fa93d1ee7b1e26f3 to your computer and use it in GitHub Desktop.
Thread safe, multiprocess python dictionary implementation backed with a directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import MutableMapping | |
from contextlib import contextmanager | |
import errno | |
from fcntl import flock, LOCK_EX, LOCK_NB, LOCK_SH, LOCK_UN | |
import itertools | |
import os | |
import signal | |
from threading import RLock | |
class FSDict(MutableMapping): | |
""" | |
Directory does not need to exist, it will be created on first write. | |
If ``textual`` is ``True``, the default, leading and trailing whitespace is | |
removed from the data when read or written. A trailing newline is always added | |
on write. | |
An ITIMER is used to allow clients to wait a short time for access to | |
the directory. | |
""" | |
def __init__(self, path, textual=True, timeout_millis=10): | |
self.path = path | |
self.textual = textual | |
self.timeout_millis = timeout_millis | |
self._lk = RLock() | |
self._fd = None | |
self._flags = LOCK_UN | |
self._flags_stack = [] | |
# Implement the context manager protocol, with `__enter__` and `__exit__`. | |
def __enter__(self): | |
self._lock_in() | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
del exc_type, exc_val, exc_tb | |
try: | |
self._lock_out() | |
except: | |
if self._fd is not None: | |
os.close(self._fd) | |
raise | |
return False | |
# Implement safe locking of this object and the underlying directory. By | |
# locking the underlying directory with `flock`, we allow multiple | |
# processes to share it safely, either via this library or through external | |
# calls to `flock`. | |
def _lock_in(self, shared=False): | |
with self._lk: | |
flags = LOCK_SH if shared else LOCK_EX | |
self._flags_stack += [flags] | |
if self._flags != LOCK_EX: # Don't give up EX on the way in | |
self._flock(flags) | |
def _lock_out(self): | |
with self._lk: | |
self._flags_stack = self._flags_stack[:-1] | |
if self._fd is not None: | |
flags = ([LOCK_UN] + self._flags_stack)[-1] | |
self._flock(flags) # It's okay to give up locks on the way out | |
def _flock(self, flags): | |
with self._lk: | |
self._try_to_open_path() | |
if self._fd is not None and self._flags != flags: | |
with timeout_in_syscalls(self.timeout_millis / 1000.0): | |
flock(self._fd, flags) | |
self._flags = flags | |
def _try_to_open_path(self): | |
if self._fd is None: | |
try: | |
self._fd = os.open(self.path, os.O_RDONLY) | |
except OSError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
def _ensure_path(self): | |
mkdir_p(os.path.dirname(self.path)) | |
def _blind_write(self, path, text): | |
if self.textual: | |
text = text.strip() + '\n' | |
with open(os.path.join(self.path, path), 'w') as h: | |
return h.write(text) | |
def __getitem__(self, path): | |
if not isinstance(path, basestring): | |
raise ValueError('FSDicts accept only string keys.') | |
self._lock_in(shared=True) | |
try: | |
with open(os.path.join(self.path, path)) as h: | |
return h.read().strip() if self.textual else h.read() | |
except IOError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
finally: | |
self._lock_out() | |
def __setitem__(self, path, data): | |
if not isinstance(path, basestring): | |
raise ValueError('FSDicts accept only string keys.') | |
if data is None: | |
self.__delitem__(path) | |
return | |
if not isinstance(data, basestring): | |
raise ValueError('FSDicts accept only string values.') | |
self._ensure_path() | |
with self: | |
try: | |
self._blind_write(path, data) | |
return | |
except IOError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
mkdir_p(os.path.dirname(os.path.join(self.path, path))) | |
self._blind_write(path, data) | |
def __delitem__(self, path): | |
with self: | |
try: | |
os.unlink(os.path.join(self.path, path)) | |
except OSError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
def __iter__(self): | |
self._lock_in(shared=True) | |
try: | |
walker = os.walk(self.path, followlinks=True) | |
paths = [(os.path.join(d, f) for f in fs) for d, _, fs in walker] | |
return iter(sorted(itertools.chain(*paths), key=split_path)) | |
finally: | |
self._lock_out() | |
def __len__(self): | |
return len(iter(self)) | |
def mkdir_p(path): | |
try: | |
os.makedirs(path) | |
except OSError as e: | |
if e.errno != errno.EEXIST or not os.path.isdir(path): | |
raise | |
def split_path(path): | |
return path.split('/') | |
def fmt_flock(flags): | |
labels = dict(EX=LOCK_EX, NB=LOCK_NB, SH=LOCK_SH, UN=LOCK_UN) | |
return '|'.join(s for s, f in sorted(labels.items()) if (flags & f) != 0) | |
@contextmanager | |
def timeout_in_syscalls(seconds): | |
def do_nothing_handler(signum, frame): | |
pass | |
previous_handler = signal.signal(signal.SIGALRM, do_nothing_handler) | |
previous_itimer_values = None | |
try: | |
previous_itimer_values = signal.setitimer(signal.ITIMER_REAL, seconds) | |
yield | |
finally: | |
if previous_itimer_values is not None: | |
signal.setitimer(signal.ITIMER_REAL, *previous_itimer_values) | |
signal.signal(signal.SIGALRM, previous_handler) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment