Skip to content

Instantly share code, notes, and snippets.

@altescy
Last active August 12, 2022 09:21
Show Gist options
  • Save altescy/3eda3f29480049a13e4c698190a47e90 to your computer and use it in GitHub Desktop.
Save altescy/3eda3f29480049a13e4c698190a47e90 to your computer and use it in GitHub Desktop.
import fcntl
import hashlib
import inspect
import io
import logging
import pickle
import string
import time
from contextlib import contextmanager
from functools import wraps
from pathlib import Path
from typing import (Any, Callable, Dict, Iterator, List, NamedTuple, Optional,
Tuple, TypeVar, Union)
T = TypeVar("T")
logger = logging.getLogger(__name__)
def b62encode(data: bytes) -> str:
num = int.from_bytes(data, "big")
characters = string.digits + string.ascii_letters
encoded = ""
base: int = len(characters)
if num < 0:
return ""
while num >= base:
mod = num % base
num //= base
encoded = characters[mod] + encoded
if num > 0:
encoded = characters[num] + encoded
return encoded
def dethash(obj: Any) -> str:
m = hashlib.blake2b()
with io.BytesIO() as buf:
pickle.dump(obj, buf)
m.update(buf.getbuffer())
return b62encode(m.digest())
class FunctionInfo(NamedTuple):
name: str
filename: Path
source: str
@classmethod
def build(cls, func: Callable[..., Any]) -> "FunctionInfo":
name = ".".join((func.__module__, func.__name__))
filename = Path(inspect.getabsfile(func))
lines, _ = inspect.getsourcelines(func)
source = "".join(lines)
return cls(name, filename, source)
def hash(self) -> str:
return dethash(self)
class ExecutionInfo(NamedTuple):
function: FunctionInfo
params: Dict[str, Any]
def __str__(self) -> str:
return f"{self.function.name}: params={self.params}"
@classmethod
def build(
cls,
func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> "ExecutionInfo":
funcinfo = FunctionInfo.build(func)
funcname = funcinfo.name
signature = inspect.signature(func)
arguments: List[Tuple[Optional[str], Any]] = [(None, v) for v in args]
arguments += list(kwargs.items())
position = 0
params: Dict[str, Any] = {}
try:
for key, param in signature.parameters.items():
assert position < len(arguments)
if param.kind == inspect.Parameter.POSITIONAL_ONLY:
assert (
arguments[position][0] is None
and param.default == inspect.Parameter.empty
)
params[key] = arguments[position][1]
position += 1
if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
if key == arguments[position][0] or arguments[position][0] is None:
params[key] = arguments[position][1]
position += 1
elif param.default != inspect.Parameter.empty:
params[key] = param.default
elif param.kind == inspect.Parameter.VAR_POSITIONAL:
params[key] = []
while (
position < len(arguments)
and arguments[position][0] not in signature.parameters
):
params[key].append(arguments[position][1])
position += 1
elif param.kind == inspect.Parameter.KEYWORD_ONLY:
assert (
key == arguments[position][0]
), f"{key=}, {arguments[position][0]=}"
params[key] = arguments[position][1]
position += 1
elif param.kind == inspect.Parameter.VAR_KEYWORD:
if key not in params:
params[key] = {}
while (
position < len(arguments)
and arguments[position][0] not in signature.parameters
):
params[key][arguments[position][0]] = arguments[position][1]
position += 1
params[key] = dict(sorted(params[key].items()))
else:
raise AssertionError("This statement is never executed.")
for extrakey, value in arguments[position:]:
assert extrakey is not None, extrakey
assert extrakey not in params, extrakey
params[extrakey] = value
assert list(params.keys()) == list(
signature.parameters.keys()
), f"{params=}, {signature.parameters=}"
except AssertionError as err:
raise ValueError(
f"Invalid arguments of {funcname}:\n\t"
f"Signature : {signature}\n\t"
f"Given args: {args=}, {kwargs=}"
) from err
return cls(funcinfo, params)
def hash(self) -> str:
return dethash(self)
class Cache:
def __init__(self, cachedir: Union[str, Path] = ".cache") -> None:
self._cachedir = Path(cachedir)
def __call__(self, func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
execinfo = ExecutionInfo.build(func, *args, **kwargs)
funcname = execinfo.function.name
cachedir = self._cachedir / execinfo.function.hash()
cachedir.mkdir(parents=True, exist_ok=True)
hashkey = execinfo.hash()
filename = cachedir / hashkey
lockfile = cachedir / (hashkey + ".lock")
with self.lock(lockfile):
if filename.exists():
logger.info("Find cache of %s at %s.", funcname, filename)
with open(filename, "rb") as pklfile:
result: T = pickle.load(pklfile)
else:
result = func(*args, **kwargs)
filename.parent.mkdir(exist_ok=True)
logger.info("Create cache of %s at %s.", funcname, filename)
with open(filename, "wb") as pklfile:
pickle.dump(result, pklfile)
return result
return wrapper
@staticmethod
@contextmanager
def lock(filename: Union[str, Path]) -> Iterator[None]:
lockfile = open(filename, "w")
try:
fcntl.flock(lockfile, fcntl.LOCK_EX)
yield
finally:
fcntl.flock(lockfile, fcntl.LOCK_UN)
lockfile.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
cache = Cache("work/cache")
@cache
def square(value: int) -> int:
logger.info(f"Compute square of {value}!")
time.sleep(1)
return value * value
print(square(1))
print(square(2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment