Last active
January 11, 2025 10:29
-
-
Save ArtemGr/bf91613a021a536c7ce16cdba9168604 to your computer and use it in GitHub Desktop.
llog
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/__pycache__ | |
/llog |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Install with | |
# | |
# python -mpip install --user --upgrade git+https://gist.github.com/bf91613a021a536c7ce16cdba9168604.git | |
import datetime | |
import math | |
import os | |
import sys | |
from inspect import currentframe, getframeinfo | |
def stdout2utf8(): | |
'''https://stackoverflow.com/questions/3597480/how-to-make-python-3-print-utf8''' | |
#sys.stdout = open(1, 'w', encoding='utf-8', closefd=False) | |
sys.stdout.reconfigure(encoding='utf-8') | |
# rindex that does not throw | |
def rindexʹ(s, ss): | |
try: | |
return s.rindex(ss) | |
except ValueError: | |
return -1 | |
def log(*args, sep=None, end=None, file=None, flush=False, back=1): | |
frame = currentframe() | |
for _ in range(back): | |
frame = frame.f_back | |
info = getframeinfo(frame) | |
name = info.filename | |
slash = max(rindexʹ(name, "/"), rindexʹ(name, "\\")) + 1 | |
name = name[slash:] | |
if name.endswith(".py"): | |
name = name[:-3] | |
args = list(args) | |
if info.function == '_call_with_frames_removed': | |
args.insert(0, 'pyx]') | |
else: | |
args.insert(0, f"{name}:{frame.f_lineno}]") | |
args.insert(0, datetime.datetime.now().strftime("%H:%M:%S")) | |
print(*args, sep=sep, end=end, file=file, flush=flush) | |
LOGC_PAST = {} | |
def logc(line): | |
'''only log the `line` when it changes''' | |
frame = currentframe() | |
frame = frame.f_back | |
info = getframeinfo(frame) | |
name = info.filename | |
slash = max(rindexʹ(name, "/"), rindexʹ(name, "\\")) + 1 | |
name = name[slash:] | |
if name.endswith(".py"): | |
name = name[:-3] | |
loc = f"{name}:{frame.f_lineno}" | |
if LOGC_PAST.get(loc) == line: | |
return False | |
tim = datetime.datetime.now().strftime("%H:%M:%S") | |
print(f"{tim} {loc}] {line}") | |
LOGC_PAST[loc] = line | |
return True | |
def floorʹ(v, extra=0): | |
'''drop more decimal places depending on whether the integer is large''' | |
if v == None: | |
return None | |
if math.isnan(v): | |
return v | |
av = abs(v) | |
if 10 <= av: | |
decimal_places = 0 | |
elif 1 <= av: | |
decimal_places = 1 | |
elif .1 <= av: | |
decimal_places = 2 | |
elif .01 <= av: | |
decimal_places = 3 | |
elif .001 <= av: | |
decimal_places = 4 | |
elif .0001 <= av: | |
decimal_places = 5 | |
else: | |
# See `format_float_positional` about printing these | |
decimal_places = 6 | |
decimal_places += extra | |
if not decimal_places: | |
return int(v) | |
r = 10**decimal_places | |
return math.floor(v * r) / r | |
def floorᵃ(a, extra=0): | |
return list(map(lambda v: floorʹ(v, extra), a)) | |
def bits2bedstead(ch): | |
''' | |
Row-major bits, 2x3, to [Bedstead](https://i.imgur.com/f3myFgM.png) | |
cf. https://youtu.be/5yoWxctJsYo graphics with teletext; Windows font rendering glitch | |
cf. https://github.com/saitoha/PySixel full-pixel color graphics on SIXEL-supported terminals | |
cf. https://stackoverflow.com/q/37288421/257568 termplotlib plotext uniplot terminalplot | |
''' | |
if 0b111111 < ch: | |
return chr(0xEE00) | |
if 32 <= ch: | |
return chr(0xEE40 + ch - 32) | |
if 0 < ch: | |
return chr(0xEE00 + ch) | |
return chr(0xEE00 + ch) | |
def bedstead2bits(ch): | |
'''[Bedstead](https://i.imgur.com/f3myFgM.png) to row-major bits, 2x3''' | |
ch = ord(ch) if len(ch) == 1 else 0 | |
if 0xEE5F < ch: # Past G1 | |
return 0 | |
if 0xEE40 <= ch: | |
return ch - 0xEE40 + 32 | |
if 0xEE00 <= ch: | |
return ch - 0xEE00 | |
return 0 # Below G1 | |
def replace_str(string, position, characters): | |
'''return a `string` with the `characters` at the given `position` replaced''' | |
# cf. https://stackoverflow.com/a/67361875/257568 fast replacement | |
return f"{string[:position]}{characters}{string[position+len(characters):]}" | |
class BedsteadMap: | |
def __init__(self, a, xs, ys, wofs=0, hofs=0, scale=True): | |
self.wofs = int(wofs) | |
self.hofs = int(hofs) | |
self.xsmin = float(min(xs)) | |
self.xsmax = float(max(xs)) | |
self.ysmin = float(min(ys)) | |
self.ysmax = float(max(ys)) | |
self.wh = (len(a[0]), len(a)) | |
self.height = int(self.wh[1] * 3) | |
ε = 0.00006103515625 # f16: smallest positive normal number | |
self.coeff = float(self.height / max(ε, self.ysmax - self.ysmin)) | |
self.scale = 1.0 | |
if isinstance(scale, bool) and scale: | |
self.scale = (self.ysmax - self.ysmin) / (self.xsmax - self.xsmin) * (self.wh[0] * 2 / self.height) | |
self.width = min(self.wh[0] * 2, int((self.xsmax - self.xsmin) * self.coeff * self.scale + 1)) | |
def zip(self, xs, ys): | |
for (x, y) in zip(xs, ys): | |
px = min(self.width - 1, int((float(x) - self.xsmin) * self.coeff * self.scale)) | |
py = min(self.height - 1, int((self.ysmax - float(y)) * self.coeff)) | |
ax = max(0, min(self.wh[0] - 1, px // 2 + self.wofs)) | |
ay = max(0, min(self.wh[1] - 1, py // 3 + self.hofs)) | |
bx = px % 2 | |
by = py % 3 | |
#print('x', x, 'y', y, 'px', px, 'py', py, 'ax', ax, 'ay', ay, 'bx', bx, 'by', by) | |
bit = 1 << (bx + by * 2) | |
yield bit, bx, by, px, py, ax, ay | |
def plot(a, xs, ys, wofs=0, hofs=0, scale=True): | |
''' | |
Example: | |
import shutil | |
wh = shutil.get_terminal_size ((111, 11)) | |
a = [[' ' for x in range (wh.columns)] for y in range (3)] | |
map = plot (a, [1, 2, 3, 4, 5, 6, 7, 8, 9], [1, 2, 3, 4, 5, 4, 3, 2, 1]) | |
for y in a: print (''.join (y) .rstrip()) | |
''' | |
map = BedsteadMap(a, xs, ys, wofs, hofs, scale) | |
for bit, *_, ax, ay in map.zip(xs, ys): | |
bits = bedstead2bits(a[ay][ax]) | |
bits |= bit | |
a[ay][ax] = bits2bedstead(bits) | |
return map | |
def has_color(): | |
'''[CLICOLOR_FORCE](https://bixense.com/clicolors/) or `isatty`''' | |
return (os.getenv('CLICOLOR_FORCE', '0') != '0') or sys.stdout.isatty() | |
def c(color: int): | |
''' | |
https://en.wikipedia.org/wiki/ANSI_escape_code#8-bit | |
https://www.ditig.com/256-colors-cheat-sheet | |
if :func:`has_color` | |
''' | |
return f"\033[38;5;{color}m" if has_color() else '' | |
def esc0(): | |
'''`\\033[0m` if :func:`has_color`''' | |
return '\033[0m' if has_color() else '' | |
def lm (fpath: str) -> float: | |
if os.path.exists (fpath): | |
return os.path.getmtime (fpath) | |
return 0. | |
if __name__ == '__main__': | |
import shutil | |
# clean the cache for library users | |
shutil.rmtree('__pycache__', ignore_errors=True) | |
for i in range(0, 2**6): | |
log(i, bits2bedstead(i), bedstead2bits(bits2bedstead(i))) | |
wh = shutil.get_terminal_size((111, 11)) | |
a = [[' ' for x in range(wh.columns)] for y in range(3)] | |
plot(a, [.1, .15, .2, .3, .4, .5, .6, .7], [.1, .14, .15, .2, .25, .3, .25, .2], scale=False) | |
print('\n'.join(''.join(y).rstrip() for y in a)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
def dump(path, size=False): | |
from llog import state | |
pv = state.State(path).dump(size=size) | |
try: | |
import yaml | |
print(yaml.dump(pv)) | |
except ModuleNotFoundError: | |
import json | |
print(json.dumps(pv, ensure_ascii=False, indent=2)) | |
if __name__ == '__main__': | |
size = False | |
for arg in sys.argv: | |
if arg == '--size': | |
size = True | |
if arg.startswith('--dump='): | |
dump(arg[7:], size) | |
exit(0) | |
print('Command line examples:') | |
print() | |
print(' python -mllog [--size] --dump=state.mdb') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class fragile(object): | |
''' | |
Fragile `with`, https://stackoverflow.com/a/23665658/257568 | |
``` | |
with fragile(open(path)) as f: | |
print 'before condition' | |
if condition: | |
raise fragile.Rollback | |
print 'after condition' | |
``` | |
''' | |
class Rollback(Exception): | |
'''break out of the `with` statement with a failure''' | |
class Commit(Exception): | |
'''break out of the `with` statement with a success''' | |
def __init__(self, value): | |
self.value = value | |
def __enter__(self): | |
return self.value.__enter__() | |
def __exit__(self, etype, value, traceback): | |
if etype == self.Commit: | |
self.value.__exit__(None, None, None) | |
return True # consume the exception | |
error = self.value.__exit__(etype, value, traceback) | |
if etype == self.Rollback: | |
return True # consume the exception | |
return error |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import re | |
from llog import log | |
# Rust code of the functions can be found at | |
# https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=8f7bd61f2c992fb9a3225c11d0b4aaec | |
# cf. https://github.com/ArtemGr/gstuff.rs/blob/5bb23a4ead69b1b7370e657f9c19061e75b6dd4a/lines.rs#L211 | |
_esc = re.compile(rb'[\x01\x00\n\r",]') | |
def _bcb(ma): | |
ch = ma[0] | |
if ch == b',': | |
return b'\x01\x06' | |
if ch == b'"': | |
return b'\x01\x05' | |
if ch == b'\r': | |
return b'\x01\x04' | |
if ch == b'\n': | |
return b'\x01\x03' | |
if ch == b'\x00': | |
return b'\x01\x02' | |
if ch == b'\x01': | |
return b'\x01\x01' | |
return ch | |
def csesc(fr: bytes): | |
'''escape 0, 10, 13, 34 (double quote) and 44 (comma)''' | |
return _esc.sub(_bcb, fr) | |
_unesc = [1, 0, 10, 13, 34, 44] | |
def csunesc(fr: bytes): | |
'''unescape for csesc''' | |
result = bytearray() | |
encoded = False | |
for code in fr: | |
if encoded: | |
encoded = False | |
result.append(_unesc[code - 1]) | |
elif code == 1: | |
encoded = True | |
else: | |
result.append(code) | |
return result | |
if __name__ == '__main__': | |
import timeit | |
import numpy as np | |
hello = "Привет, Юля!".encode('utf-8') | |
hesc = "Привет\u0001\u0006 Юля!".encode('utf-8') | |
def __csescᵗ(): | |
assert csesc(hello) == hesc | |
t = timeit.timeit(__csescᵗ, number=99) / 99 | |
log('csesc', np.format_float_positional(t, trim='-'), 'o/s') | |
assert csunesc("Привет\u0001\u0006 Юля!".encode('utf-8')) == "Привет, Юля!".encode('utf-8') | |
assert csunesc( | |
"0:\u0001\u0002\u0001\u000610:\u0001\u0003\u0001\u000613:\u0001\u0004\u0001\u000634:\u0001\u0005".encode( | |
'utf-8')) == "0:\x00,10:\x0a,13:\x0d,34:\"".encode('utf-8') | |
assert csesc("Привет, Юля!".encode('utf-8')) == "Привет\u0001\u0006 Юля!".encode('utf-8') | |
assert csesc( | |
"0:\x00,10:\x0a,13:\x0d,34:\"".encode('utf-8') | |
) == "0:\u0001\u0002\u0001\u000610:\u0001\u0003\u0001\u000613:\u0001\u0004\u0001\u000634:\u0001\u0005".encode( | |
'utf-8') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from setuptools import setup | |
import os | |
import shutil | |
if __name__ == "__main__": | |
if not os.path.exists('llog'): | |
os.mkdir('llog') | |
shutil.copyfile('__init__.py', 'llog/__init__.py') | |
shutil.copyfile('__main__.py', 'llog/__main__.py') | |
shutil.copyfile('state.py', 'llog/state.py') | |
shutil.copyfile('control.py', 'llog/control.py') | |
shutil.copyfile('lines.py', 'llog/lines.py') | |
setup( | |
name='llog', | |
version='2.0.3', | |
description='Log with line number. Bedstead plots. State.', | |
author='Artemciy', | |
author_email='[email protected]', | |
packages=['llog'], # Same as name | |
# NB: “state.py” requires “lmdb” and “cbor2”, but one doesn't have to use it, | |
# they are *optional* so to speak. | |
# ⌥ use square bracket dependencies hence | |
install_requires=[], # External packages as dependencies | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from datetime import timezone | |
import cbor2 | |
import lmdb | |
# Python module: https://lmdb.readthedocs.io/en/release/ | |
# Binary serialization for the fields: https://docs.python.org/3/library/struct.html | |
# https://docs.python.org/3/reference/compound_stmts.html#with | |
# https://docs.python.org/3/reference/datamodel.html#context-managers | |
# https://docs.python.org/3/reference/datamodel.html?emulating-container-types#emulating-container-types | |
# A short example at https://stackoverflow.com/a/23976949/257568 | |
# https://docs.python.org/3/library/stdtypes.html#bytes-and-bytearray-operations | |
def str2byt(sob): | |
if isinstance(sob, str): | |
return sob.encode() | |
else: | |
return sob | |
class TranIter: | |
def __init__(self, cur): | |
self.cur = cur | |
def __next__(self): | |
if self.cur: | |
if self.cur.next(): | |
return str(self.cur.key(), 'utf-8') | |
else: | |
self.cur.close() | |
self.cur = None | |
raise StopIteration | |
else: | |
raise StopIteration | |
def __del__(self): | |
if self.cur: | |
self.cur.close() | |
class Tran: | |
def __init__(self, env, dbn=b'default', write=True): | |
self.env = env | |
self.dbn = str2byt(dbn) | |
self.write = write | |
def __enter__(self): | |
self.txn = self.env.begin(write=self.write, buffers=True) | |
self.db = self.env.open_db(self.dbn, txn=self.txn) | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
if exc_type == None: | |
self.txn.commit() | |
else: | |
self.txn.abort() | |
def clear(self, drop=False): | |
self.txn.drop(self.db, delete=drop) | |
def __delitem__(self, key): | |
if not self.txn.delete(str2byt(key), db=self.db): | |
raise KeyError(f"No such key: “{key}”") | |
def __setitem__(self, key, value): | |
cv = cbor2.dumps(value, timezone=timezone.utc, value_sharing=True, string_referencing=True) | |
self.txn.put(str2byt(key), cv, db=self.db) | |
async def amem(self, key, vf): | |
''' | |
memoize the key, invoking `vf` to generate a missing value | |
``` | |
with state.begin() as st: | |
foo = await st.amem('foo', lambda: return_coroutine()) | |
``` | |
''' | |
cur = self.txn.cursor(self.db) | |
key = str2byt(key) | |
found = cur.set_key(key) | |
if found: | |
pv = cbor2.loads(cur.value()) | |
else: | |
pv = await vf() | |
cv = cbor2.dumps(pv, timezone=timezone.utc, value_sharing=True, string_referencing=True) | |
cur.put(key, cv) | |
cur.close() | |
return pv | |
def mem(self, key, vf): | |
'''memoize the key, invoking `vf` to generate a missing value''' | |
cur = self.txn.cursor(self.db) | |
key = str2byt(key) | |
found = cur.set_key(key) | |
if found: | |
pv = cbor2.loads(cur.value()) | |
else: | |
pv = vf() | |
cv = cbor2.dumps(pv, timezone=timezone.utc, value_sharing=True, string_referencing=True) | |
cur.put(key, cv) | |
cur.close() | |
return pv | |
def get(self, key, default=None): | |
mv = self.txn.get(str2byt(key), db=self.db) | |
if mv == None: | |
return default | |
# https://docs.python.org/3/library/stdtypes.html#memoryview | |
pv = cbor2.loads(mv) | |
mv.release() | |
return pv | |
def __getitem__(self, key): | |
return self.get(key) | |
def __contains__(self, key): | |
cur = self.txn.cursor(self.db) | |
found = cur.set_key(str2byt(key)) | |
cur.close() | |
return found | |
def __iter__(self): | |
''' | |
``` | |
with state.begin('lemmings') as st: | |
lemmings = {name: st[name] for name in st} | |
``` | |
''' | |
return TranIter(self.txn.cursor(self.db)) | |
def items(self): | |
'''Note that value is a copy, save it back with assignment to persist.''' | |
cur = self.txn.cursor(self.db) | |
while cur.next(): | |
key = str(cur.key(), 'utf-8') | |
val = cbor2.loads(cur.value()) | |
yield (key, val) | |
cur.close() | |
def save(self, key: str, empty_collection): | |
'''Context Manager persisting the referenced collection on a clean `__exit__`''' | |
return Saviour(self, key, empty_collection) | |
class Saviour: | |
'''Context Manager for a state-hosted collection (like `dict`), saving it back on `__exit__`''' | |
def __init__(self, st: Tran, key: str, empty_collection): | |
self.st = st | |
self.key = key | |
self.empty_collection = empty_collection | |
def __enter__(self): | |
self.collection = self.st.get(self.key, self.empty_collection) | |
return self.collection | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.st[self.key] = self.collection | |
class StateSaviour: | |
'''Context Manager for a state-hosted collection (like `dict`), saving it back on `__exit__`''' | |
def __init__(self, state, key: str, empty_collection): | |
self.state = state | |
self.key = key | |
self.empty_collection = empty_collection | |
def __enter__(self): | |
self.collection = self.state.get(self.key, self.empty_collection) | |
return self.collection | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
if exc_type == None: | |
self.state[self.key] = self.collection | |
class State: | |
def __init__(self, path, max_dbs=64): | |
self.env = lmdb.Environment(path, | |
metasync=False, | |
sync=False, | |
map_async=True, | |
writemap=True, | |
max_dbs=max_dbs) | |
def begin(self, dbn=b'default', write=True): | |
return Tran(self.env, dbn, write) | |
def cur2(self, callback, dbn=b'default', write=True): | |
'''run `callback` on a cursor''' | |
with self.begin(dbn, write) as st: | |
cur = st.txn.cursor(st.db) | |
callback(st, cur) | |
cur.close() | |
def __setitem__(self, key, value): | |
with self.begin() as st: | |
st[key] = value | |
async def amem(self, key, vf): | |
'''memoize the key, invoking coroutine-returning `vf` to generate a missing value''' | |
with self.begin() as st: | |
return await st.amem(key, vf) | |
def mem(self, key, vf): | |
'''memoize the key, invoking `vf` to generate a missing value''' | |
with self.begin() as st: | |
return st.mem(key, vf) | |
def get(self, key, default=None): | |
with self.begin() as st: | |
return st.get(key, default) | |
def __getitem__(self, key): | |
with self.begin() as st: | |
return st[key] | |
def __contains__(self, key): | |
with self.begin() as st: | |
return key in st | |
def save(self, key: str, empty_collection): | |
'''Context Manager persisting the referenced collection on a clean `__exit__`''' | |
return StateSaviour(self, key, empty_collection) | |
def dump(self, size=False, cbor=False): | |
''' | |
python -c "import json; from llog.state import *; print(json.dumps(State('state.mdb').dump(), indent=2))" | |
python -c "import yaml; from llog.state import *; print(yaml.dump(State('state.mdb').dump()))" | |
python -mllog --dump=state.mdb | |
atexit.register(lambda: open('state.yaml', 'w', encoding='utf-8').write( | |
yaml.dump(State('perpetual.mdb').dump(), allow_unicode=True, width=99))) | |
''' | |
dick = {} | |
with self.env.begin(write=False) as txn: | |
for dbn, _ in txn.cursor().iternext(): | |
dick[dbn.decode()] = {} | |
for dbn in dick: | |
db = self.env.open_db(dbn.encode(), txn=txn) | |
dick_cheney = {} | |
for key, value in txn.cursor(db=db).iternext(): | |
if size: | |
dick_cheney[key.decode()] = len(value) | |
elif cbor: | |
dick_cheney[key.decode()] = value | |
else: | |
pv = cbor2.loads(value) | |
dick_cheney[key.decode()] = pv | |
dick[dbn] = dick_cheney | |
return dick | |
if __name__ == '__main__': | |
import os | |
import shutil | |
import tempfile | |
import time | |
from llog import log | |
# clean the cache for library users | |
shutil.rmtree('__pycache__', ignore_errors=True) | |
log(f"Going to “{tempfile.gettempdir()}”…") | |
os.chdir(tempfile.gettempdir()) | |
# to start with a clean slate | |
#shutil.rmtree('test-state.mdb') | |
instance = int(time.time() / 600) | |
state = State('test-state.mdb') | |
# Example of extending the database size | |
# which helps with “MDB_MAP_FULL: Environment mapsize limit reached” | |
# https://lmdb.readthedocs.io/en/release/#lmdb.Environment.set_mapsize | |
state.env.set_mapsize(11 * 1024 * 1024) | |
with state.begin() as st: | |
if st['instance'] != instance: | |
log(f"Instance {instance} is new, scrapping the state…") | |
st.clear() | |
st['instance'] = instance | |
tim = st.mem('tim', lambda: int(time.time())) | |
state['foo'] = 'bar' | |
bar = state.mem('bar', lambda: 40 + 2) | |
with state.begin('count') as ct: | |
ct['cnt'] = ct['cnt'] + 1 if ct['cnt'] != None else 0 | |
assert 'cnt' in ct | |
with state.begin() as st: | |
with st.save('dick', {}) as dicst: | |
dicst['tim'] = int(time.time()) | |
assert 'dick' in st | |
with state.save('dick', {}) as dicst: | |
dicst['sec'] = dicst['tim'] % 60 | |
log(state.dump()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment