Created
January 29, 2021 13:06
-
-
Save achinta/e5413753d385ad8c84fc0489a01560b5 to your computer and use it in GitHub Desktop.
Lock File Decorator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import time | |
import json | |
from typing import Dict | |
def lock_file(func): | |
''' | |
Decorator which | |
''' | |
def wrapper(path: Path, data): | |
# before writing ensure there is no lock | |
assert path.exists() | |
lock_path = Path(path.resolve().as_posix() + '.LCK') | |
# how long we have been waiting | |
wait_secs = 0 | |
while lock_path.exists() and wait_secs < 10: | |
print(f'lock exists for {path}. Sleeping....') | |
time.sleep(1) | |
wait_secs += 1 | |
lock_path.touch() | |
print(f'created lock file for {path}') | |
# call the decorated function | |
func(path, data) | |
# remove the lock | |
if lock_path.exists(): | |
print(f'Done. Removing lock for {path}. Sleeping') | |
lock_path.unlink() | |
return wrapper | |
@lock_file | |
def update_json(path: Path, data: Dict): | |
with open(path, 'r') as f: | |
file_json = json.load(f) | |
file_json.update(data) | |
with open(path, 'w') as f: | |
json.dump(file_json, f) | |
@lock_file | |
def update_json_slow(path: Path, data: Dict): | |
time.sleep(8) | |
with open(path, 'r') as f: | |
file_json = json.load(f) | |
file_json.update(data) | |
with open(path, 'w') as f: | |
json.dump(file_json, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment