Last active
August 2, 2018 01:36
-
-
Save mx-moth/79c0bba89a242bbf5c4682ac9686ce92 to your computer and use it in GitHub Desktop.
Python multiprocess FileLock class using fcntl.flock()
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fcntl | |
import os | |
from contextlib import contextmanager | |
MODE_NAMES = { | |
fcntl.LOCK_SH: 'shared', | |
fcntl.LOCK_EX: 'exclusive', | |
} | |
class FileLock: | |
def __init__(self, filename): | |
self.filename = filename | |
self.reader = self.shared = _FileLock(self, fcntl.LOCK_SH) | |
self.writer = self.exclusive = _FileLock(self, fcntl.LOCK_EX) | |
self.held = False | |
self.mode = None | |
self._handle = None | |
@property | |
def handle(self): | |
if self._handle is None or self._handle.closed: | |
self._handle = open(self.filename, 'w+') | |
return self._handle | |
def close(self): | |
if self.held: | |
raise RuntimeError("Can't close held lock, release it first") | |
if self._handle and not self._handle.closed: | |
self._handle.close() | |
self._handle = None | |
def delete(self): | |
self.close() | |
os.remove(self.filename) | |
def _acquire(self, mode): | |
self.held = True | |
self.mode = mode | |
def _release(self): | |
self.held = False | |
self.mode = None | |
def __repr__(self): | |
if self.held: | |
mode = MODE_NAMES[self.mode] | |
else: | |
mode = 'not held' | |
return f'<{type(self).__name__}: {self.filename} ({mode})>' | |
class _FileLock: | |
""" | |
Helper class that locks a file with a mode (exclusive or shared). Should be | |
used through a FileLock instance. | |
""" | |
def __init__(self, file_lock, mode): | |
self.file_lock = file_lock | |
self.mode = mode | |
def acquire(self, blocking=True): | |
"""Acquire this lock.""" | |
if self.file_lock.held: | |
raise RuntimeError( | |
f"Lock is already held as {MODE_NAMES[self.file_lock.mode]}") | |
flags = self.mode | (0 if blocking else fcntl.LOCK_NB) | |
fcntl.flock(self.file_lock.handle, flags) | |
self.file_lock._acquire(self.mode) | |
def release(self): | |
"""Release this lock.""" | |
if not self.file_lock.held: | |
raise RuntimeError("Lock is not held") | |
elif self.file_lock.mode != self.mode: | |
raise RuntimeError("Lock is held as {held} not {desired}".format( | |
held=MODE_NAMES[self.file_lock.mode], | |
desired=MODE_NAMES[self.mode])) | |
fcntl.flock(self.file_lock.handle, fcntl.LOCK_UN) | |
self.file_lock._release() | |
@contextmanager | |
def __call__(self, blocking=True): | |
self.acquire(blocking=blocking) | |
try: | |
yield | |
finally: | |
self.release() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import logging | |
import logging.config | |
import random | |
import time | |
from concurrent.futures import ProcessPoolExecutor | |
from functools import wraps | |
from multiprocessing import cpu_count | |
from os import makedirs | |
from pathlib import Path | |
from shutil import rmtree | |
from filelock import FileLock | |
logger = logging.getLogger('locks') | |
OUTPUT = Path('output') | |
LOCKS = OUTPUT / '.locks' | |
def log_exceptions(func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception: | |
logger.exception("Whoops") | |
raise | |
return wrapper | |
@log_exceptions | |
def do_the_thing(band, timesteps): | |
logger.debug(f"Processing band {band}") | |
for timestep in timesteps: | |
# Preprocess the things | |
time.sleep(random.random() * 3) | |
# Get the locks | |
filename = f'output_{timestep}.grb' | |
lock = FileLock(LOCKS / f'{filename}.lock') | |
logger.debug(f"{band}, {timestep}: Aquiring lock {lock!r}...") | |
with lock.exclusive(): | |
logger.debug(f"{band}, {timestep}: Got lock {lock!r}!") | |
# Open the output file for writing | |
with open(OUTPUT / filename, 'a') as output: | |
# Write all the things | |
output.write(f'{band}: Writing...\n') | |
output.flush() | |
time.sleep(random.random() * 2) | |
output.write(f'{band}: Done!\n') | |
output.flush() | |
logger.debug(f"{band}, {timestep}: Releasing lock {lock!r}...") | |
def main(): | |
logging.config.dictConfig({ | |
'version': 1, | |
'handlers': { | |
'console': { | |
'class': 'logging.StreamHandler', | |
'level': 'DEBUG', | |
'formatter': 'multiprocess', | |
}, | |
}, | |
'formatters': { | |
'multiprocess': { | |
'class': 'logging.Formatter', | |
'format': '%(asctime)s %(processName)-7s %(message)s', | |
}, | |
}, | |
'loggers': { | |
'locks': { | |
'level': 'DEBUG', | |
'handlers': ['console'], | |
}, | |
}, | |
}) | |
bands = [band.rjust(5) for band in [ | |
'foo', 'bar', 'baz', 'quux', | |
'earth', 'fire', 'wind', 'water', 'heart' | |
]] | |
random.shuffle(bands) | |
today = datetime.date.today() | |
timesteps = [ | |
(today + datetime.timedelta(days=n)).isoformat() | |
for n in range(3)] | |
makedirs(OUTPUT, exist_ok=True) | |
makedirs(LOCKS, exist_ok=True) | |
with ProcessPoolExecutor(max_workers=cpu_count()) as pool: | |
for band in bands: | |
pool.submit(do_the_thing, band, timesteps) | |
logger.debug("Submitted all the jobs") | |
pool.shutdown() | |
logger.debug("Done!") | |
rmtree(LOCKS) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment