Last active
August 12, 2022 09:21
-
-
Save altescy/3eda3f29480049a13e4c698190a47e90 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fcntl | |
import hashlib | |
import inspect | |
import io | |
import logging | |
import pickle | |
import string | |
import time | |
from contextlib import contextmanager | |
from functools import wraps | |
from pathlib import Path | |
from typing import (Any, Callable, Dict, Iterator, List, NamedTuple, Optional, | |
Tuple, TypeVar, Union) | |
T = TypeVar("T") | |
logger = logging.getLogger(__name__) | |
def b62encode(data: bytes) -> str: | |
num = int.from_bytes(data, "big") | |
characters = string.digits + string.ascii_letters | |
encoded = "" | |
base: int = len(characters) | |
if num < 0: | |
return "" | |
while num >= base: | |
mod = num % base | |
num //= base | |
encoded = characters[mod] + encoded | |
if num > 0: | |
encoded = characters[num] + encoded | |
return encoded | |
def dethash(obj: Any) -> str: | |
m = hashlib.blake2b() | |
with io.BytesIO() as buf: | |
pickle.dump(obj, buf) | |
m.update(buf.getbuffer()) | |
return b62encode(m.digest()) | |
class FunctionInfo(NamedTuple): | |
name: str | |
filename: Path | |
source: str | |
@classmethod | |
def build(cls, func: Callable[..., Any]) -> "FunctionInfo": | |
name = ".".join((func.__module__, func.__name__)) | |
filename = Path(inspect.getabsfile(func)) | |
lines, _ = inspect.getsourcelines(func) | |
source = "".join(lines) | |
return cls(name, filename, source) | |
def hash(self) -> str: | |
return dethash(self) | |
class ExecutionInfo(NamedTuple): | |
function: FunctionInfo | |
params: Dict[str, Any] | |
def __str__(self) -> str: | |
return f"{self.function.name}: params={self.params}" | |
@classmethod | |
def build( | |
cls, | |
func: Callable[..., Any], | |
*args: Any, | |
**kwargs: Any, | |
) -> "ExecutionInfo": | |
funcinfo = FunctionInfo.build(func) | |
funcname = funcinfo.name | |
signature = inspect.signature(func) | |
arguments: List[Tuple[Optional[str], Any]] = [(None, v) for v in args] | |
arguments += list(kwargs.items()) | |
position = 0 | |
params: Dict[str, Any] = {} | |
try: | |
for key, param in signature.parameters.items(): | |
assert position < len(arguments) | |
if param.kind == inspect.Parameter.POSITIONAL_ONLY: | |
assert ( | |
arguments[position][0] is None | |
and param.default == inspect.Parameter.empty | |
) | |
params[key] = arguments[position][1] | |
position += 1 | |
if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: | |
if key == arguments[position][0] or arguments[position][0] is None: | |
params[key] = arguments[position][1] | |
position += 1 | |
elif param.default != inspect.Parameter.empty: | |
params[key] = param.default | |
elif param.kind == inspect.Parameter.VAR_POSITIONAL: | |
params[key] = [] | |
while ( | |
position < len(arguments) | |
and arguments[position][0] not in signature.parameters | |
): | |
params[key].append(arguments[position][1]) | |
position += 1 | |
elif param.kind == inspect.Parameter.KEYWORD_ONLY: | |
assert ( | |
key == arguments[position][0] | |
), f"{key=}, {arguments[position][0]=}" | |
params[key] = arguments[position][1] | |
position += 1 | |
elif param.kind == inspect.Parameter.VAR_KEYWORD: | |
if key not in params: | |
params[key] = {} | |
while ( | |
position < len(arguments) | |
and arguments[position][0] not in signature.parameters | |
): | |
params[key][arguments[position][0]] = arguments[position][1] | |
position += 1 | |
params[key] = dict(sorted(params[key].items())) | |
else: | |
raise AssertionError("This statement is never executed.") | |
for extrakey, value in arguments[position:]: | |
assert extrakey is not None, extrakey | |
assert extrakey not in params, extrakey | |
params[extrakey] = value | |
assert list(params.keys()) == list( | |
signature.parameters.keys() | |
), f"{params=}, {signature.parameters=}" | |
except AssertionError as err: | |
raise ValueError( | |
f"Invalid arguments of {funcname}:\n\t" | |
f"Signature : {signature}\n\t" | |
f"Given args: {args=}, {kwargs=}" | |
) from err | |
return cls(funcinfo, params) | |
def hash(self) -> str: | |
return dethash(self) | |
class Cache: | |
def __init__(self, cachedir: Union[str, Path] = ".cache") -> None: | |
self._cachedir = Path(cachedir) | |
def __call__(self, func: Callable[..., T]) -> Callable[..., T]: | |
@wraps(func) | |
def wrapper(*args: Any, **kwargs: Any) -> T: | |
execinfo = ExecutionInfo.build(func, *args, **kwargs) | |
funcname = execinfo.function.name | |
cachedir = self._cachedir / execinfo.function.hash() | |
cachedir.mkdir(parents=True, exist_ok=True) | |
hashkey = execinfo.hash() | |
filename = cachedir / hashkey | |
lockfile = cachedir / (hashkey + ".lock") | |
with self.lock(lockfile): | |
if filename.exists(): | |
logger.info("Find cache of %s at %s.", funcname, filename) | |
with open(filename, "rb") as pklfile: | |
result: T = pickle.load(pklfile) | |
else: | |
result = func(*args, **kwargs) | |
filename.parent.mkdir(exist_ok=True) | |
logger.info("Create cache of %s at %s.", funcname, filename) | |
with open(filename, "wb") as pklfile: | |
pickle.dump(result, pklfile) | |
return result | |
return wrapper | |
@staticmethod | |
@contextmanager | |
def lock(filename: Union[str, Path]) -> Iterator[None]: | |
lockfile = open(filename, "w") | |
try: | |
fcntl.flock(lockfile, fcntl.LOCK_EX) | |
yield | |
finally: | |
fcntl.flock(lockfile, fcntl.LOCK_UN) | |
lockfile.close() | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
cache = Cache("work/cache") | |
@cache | |
def square(value: int) -> int: | |
logger.info(f"Compute square of {value}!") | |
time.sleep(1) | |
return value * value | |
print(square(1)) | |
print(square(2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment