Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from giladbarnea/python_logging.py
Created August 13, 2025 12:37
Show Gist options
  • Save datavudeja/4e26026b8ae49ed0413335683518c04f to your computer and use it in GitHub Desktop.
Save datavudeja/4e26026b8ae49ed0413335683518c04f to your computer and use it in GitHub Desktop.
python_logging.py: python logging snippets
from __future__ import annotations
import builtins
from datetime import datetime
import functools
import inspect
import logging
import os
import re
import sys
import time
from collections.abc import Callable
from contextlib import suppress, contextmanager, redirect_stdout
from functools import partialmethod
from io import StringIO
from logging.handlers import RotatingFileHandler
from pprint import pprint
from typing import TypeVar, Union
# good for pyspark sql cols and dfs
from varname import nameof
import builtins
import textwrap
printcount=1
def print(obj, reper=None, container=None, comment=None):
nonlocal printcount
nameof_var = nameof(obj, frame=2)
if reper:
description = f"{nameof_var} = {reper}"
else:
description = nameof_var
if comment:
description += "\n# " + '\n# '.join(textwrap.wrap(comment, width=60))
builtins.print(f'{printcount}|{description}')
if container:
container.select(obj).show()
elif callable(getattr(obj, 'show', None)):
obj.show()
else:
builtins.print(obj, end='\n\n')
printcount+=1
try:
from loguru import logger as loguru_logger
except ModuleNotFoundError:
pass
else:
loguru_logger.__class__.title = partialmethod(loguru_logger.__class__.log,
loguru_logger.level("title",
no=25,
color='<bold><white>').name)
TERMWIDTH = None
for fd in (sys.__stdin__.fileno(), sys.__stdout__.fileno(), sys.__stderr__.fileno()):
try:
TERMWIDTH = os.get_terminal_size(fd)[0]
except (AttributeError, ValueError, OSError):
pass
else:
break
if not TERMWIDTH or TERMWIDTH <= 80:
columns = os.environ.get("COLUMNS")
if columns and columns.isdigit() and int(columns) > 80:
TERMWIDTH = int(columns)
if not TERMWIDTH or TERMWIDTH <= 80:
TERMWIDTH = 170
OBJ_RE = re.compile(r'<(?:[\w\d<]+\.)*([\w\d>]+)[ object]* at (0x[\w\d]{12})>')
TYPE_RE = re.compile(r"<class '(?:[\w\d<>]+\.)*([\w\d]+)'>")
WHITESPACE_RE = re.compile(r'\s+')
COLOR_RE = re.compile(r'(\x1b\[(?:\d;?)*m)')
INTERACTIVE = hasattr(sys.stdin, "fileno") and os.isatty(sys.stdin.fileno())
RUN_BY_HUMAN = hasattr(sys.stderr, "isatty") and sys.stderr.isatty()
try:
import rich
except ModuleNotFoundError:
print('\n\x1b[97;1m[internal_log.py] Running: pip install rich\x1b[0m\n', file=sys.stderr)
failed = bool(os.system('pip install --retries=2 rich'))
if failed:
msg = f'\n\x1b[91;1m[internal_log.py] Failed: pip install rich\x1b[0m\n'
else:
msg = f'\n\x1b[92;1m[internal_log.py] Success: pip install rich\x1b[0m\n'
print(msg, file=sys.stderr)
finally:
try:
import rich
except ModuleNotFoundError:
pass
else:
sys.modules['rich'] = rich
if 'rich' in sys.modules:
import rich
from rich.console import Console
from rich.theme import Theme
PYCHARM_HOSTED = os.getenv('PYCHARM_HOSTED')
theme = {
'debug': 'dim',
'warn': 'yellow',
'warning': 'yellow',
'error': 'red',
'fatal': 'bright_red',
'success': 'green',
'prompt': 'b bright_cyan',
'title': 'b bright_white',
}
con = Console(force_terminal=False,
log_time_format='[%T %X]',
color_system='auto' if PYCHARM_HOSTED else 'truecolor',
width=TERMWIDTH,
tab_size=2,
file=sys.stdout if PYCHARM_HOSTED else sys.stderr,
theme=Theme({**theme, **{k.upper(): v for k, v in theme.items()}}))
else:
# Failed importing rich, probably network issue
class CaptureShim(StringIO):
def get(self):
return self.getvalue()
class ConsoleShim:
print = staticmethod(pprint)
print_json = staticmethod(pprint)
@contextmanager
def capture(self):
file = CaptureShim()
with redirect_stdout(file):
try:
yield file
except Exception:
pass
con = ConsoleShim()
builtins.con = con
def decolor(s):
return COLOR_RE.sub('', s)
def shorten(s, limit=TERMWIDTH):
if not s:
return s
if limit < 4:
logging.warning(f"shorten({shorten(repr(s), limit=20)}) was called with limit = %d, can handle limit >= 4", limit)
return s
length = len(s)
if length <= limit:
return s
half_the_limit = limit // 2
if '\x1b[' in s:
no_color = decolor(s)
real_length = len(no_color)
if real_length <= limit:
return s
color_matches: list[re.Match] = list(COLOR_RE.finditer(s))
if len(color_matches) == 2:
# Only 2 color tags (open and close)
color_a, color_b = color_matches
if color_a.start() == 0 and color_b.end() == length:
# Colors surround string from both ends
return f'{color_a.group()}{shorten(no_color, limit)}{color_b.group()}'
return shorten(no_color, limit)
# escape_seq_start_rindex = s.rindex('\x1b')
# left_cutoff = max(escape_seq_start_index + 4, half_the_limit)
# right_cutoff = min((real_length - escape_seq_start_rindex) + 4, half_the_limit)
# print(f'{limit = } | {length = } | {real_length = } | {left_cutoff = } | {right_cutoff = } | {half_the_limit = } | {escape_seq_start_index = } | {escape_seq_start_rindex = }')
left_cutoff = max(half_the_limit - 3, 1)
right_cutoff = max(half_the_limit - 4, 1)
# print(f'{limit = } | {length = } | {left_cutoff = } | {right_cutoff = } | {half_the_limit = }')
free_chars = limit - left_cutoff - right_cutoff
assert free_chars > 0, f'{free_chars = } not > 0'
beginning = s[:left_cutoff]
end = s[-right_cutoff:]
if free_chars >= 7:
separator = ' [...] '
elif free_chars >= 5:
separator = '[...]'
elif free_chars >= 4:
separator = ' .. '
else:
separator = '.' * free_chars
assert len(separator) <= free_chars, f'{len(separator) = } ! <= {free_chars = }'
return WHITESPACE_RE.sub(' ', f'{beginning}{separator}{end}')
class Reprer:
def __init__(self, printer: Callable = None):
if not printer:
printer = functools.partial(con.print, soft_wrap=True)
self._print = printer
self._repr_fns = {
lambda obj: hasattr(obj, 'url_root'): lambda obj: printer(obj, f' url_root = {obj.url_root}'),
# lambda obj: hasattr(obj, 'payload'): lambda obj: printer(obj, f' payload = {obj.payload}'),
}
def repr(self, obj):
for condition, print_fn in self._repr_fns.items():
if condition(obj):
print_fn(obj)
return
self._print(pretty_inst(obj))
reprer = Reprer()
def markup_repr(obj) -> str:
with con.capture() as captured:
if isinstance(obj, (dict, list)):
from rich.json import JSON
try:
pretty = JSON.from_data(obj, indent=4, sort_keys=True)
con.print(pretty)
except TypeError:
# con.print(obj) # use reprer?
reprer.repr(obj)
elif isinstance(obj, str) and obj.startswith("{\""):
con.print_json(obj, indent=4, sort_keys=True)
elif isinstance(obj, BaseException):
con.print(f'{obj.__class__.__qualname__}({", ".join(map(repr, obj.args))})') # use reprer?
else:
reprer.repr(obj)
return captured.get().rstrip()
T = TypeVar('T')
def pretty_inst(obj: T) -> Union[str, T]:
"""<foo.bar.Person object at 0x7f2137d027c0> -> Person (0x7f2137d027c0)"""
if isinstance(obj, str):
# In case obj doesn't match "<foo> at 0x123",
# just return as-is without additional str(...).
if not OBJ_RE.search(obj):
return obj
string = obj
else:
# In case str(obj) doesn't match "<foo> at 0x123",
# just return as-is without converting to string.
match = OBJ_RE.search(str(obj))
if not match:
return obj
string = str(obj)
return OBJ_RE.sub(lambda _match: f'{(groups := _match.groups())[0]} ({groups[1]})', string)
def pretty_type(obj) -> str: # unused?
"""pretty_type({"hi" : "bye"}) -> dict (1)"""
stringified_type: str
if isinstance(obj, str):
return 'str'
elif type(obj) is type:
stringified_type = str(obj)
else:
stringified_type = str(type(obj))
rv = TYPE_RE.sub(lambda match: match.groups()[0], stringified_type)
with suppress(TypeError):
rv += f' ({len(obj)})'
return rv
def pretty_signature(method, *args, **kwargs) -> str:
pretty_sig = "\x1b[97;48;2;30;30;30m" if RUN_BY_HUMAN else ""
if hasattr(method, '__name__'): # because __qualname__ can be e.g 'AccountServiceCallback.__init__'
method_name = method.__name__
elif hasattr(method, '__qualname__'):
method_name = method.__qualname__
else:
method_name = str(method)
left_offset = len(method_name)
instance_name = None
if args:
first_arg, *rest = args
# if 'AccountServiceCallback' in first_arg.__class__.__name__ and '__init__' in method_name:
# print(f'{args[0] = }', f'{first_arg = }', f'{method_name = }', f'{hasattr(first_arg, method_name) = }', sep='\n')
# If 'some_method(foo, *args, **kwargs)', or 'some_method(Foo, *args, **kwargs)',
# Display it as 'Foo.some_method(*args, **kwargs)'.
if hasattr(first_arg, method_name): # __qualname__ can include class, e.g 'AccountServiceCallback.__init__'
obj = first_arg
args = rest
# if some_method(Class, *args, **kwargs), then `Class` arg is a class
if type(obj) is type:
instance_name = pretty_inst(obj.__qualname__)
else:
instance_name = pretty_inst(obj.__class__.__qualname__)
pretty_sig += f'{instance_name}.'
left_offset += len(instance_name) + 1
if '.' not in method_name and not instance_name:
# 'put' -> 'platforms_conf.put'
pretty_sig += f'{method.__module__}.'
left_offset += len(method.__module__) + 1
# pretty_args = []
# args_len = len(args)
# for i, arg in enumerate(args):
# # Relies on pretty_inst returning as-is if obj doesn't match "<foo> at 0x123".
# pretty_arg = markup_repr(pretty_inst(repr(arg)))
# if i < args_len - 2:
# pretty_arg += '\x1b[33m,\x1b[0m'
# if len(pretty_arg) >= TERMWIDTH:
# pretty_arg += '\n' + ' ' * (len(method_name) + 4)
# else:
# pretty_arg += ' '
# pretty_args.append(pretty_arg)
# pretty_args_joined = ''.join(pretty_args)
left_offset = ' ' * (left_offset + 4)
pretty_args = list(map(markup_repr, map(pretty_inst, map(repr, args))))
if False and sum(map(len, pretty_args)) >= TERMWIDTH:
pretty_args_joined = ('\x1b[1;33m,\x1b[0m\n' + left_offset).join(pretty_args)
else:
pretty_args_joined = '\x1b[1;33m,\x1b[0m '.join(pretty_args)
# pretty_args_joined = ", ".join(map(markup_repr, args))
pretty_kwargs = []
for k, v in kwargs.items():
# pretty_kwargs.append(f'{k}={markup_repr(v)}')
pretty_kwargs.append(f'\x1b[33m{k}\x1b[0m={markup_repr(pretty_inst(repr(v)))}')
pretty_kwargs_joined = f",\n{left_offset}".join(pretty_kwargs)
pretty_sig += f'{method_name}\x1b[0m(' + pretty_args_joined + (', ' if args and kwargs else '') + pretty_kwargs_joined + ')'
return pretty_sig
global last_log_ts
last_log_ts = 0
def log_method_calls(maybe_class_or_fn: Callable | type = None,
*,
only: tuple[str, ...] | Callable[[str], bool] = (),
exclude: tuple[str, ...] | Callable[[str], bool] = (),
short=False,
) -> Callable:
"""
A class or function decorator, logs when a method is called, and when it returns (with args and return values).
Examples:
# Ex. 1
@log_method_calls
class Calculator:
def add(self, a, b): return a+b
# Ex. 2
@log_method_call(only=['add'])
class ProCalculator:
def add(self, a, b): return a + b
def divide(self, a, b): return a / b
# Ex. 3
@log_method_calls
def say_hello(name): print(f'Hello, {name}!')
Args:
only: Optionally specify `only=['some_method', 'other_method']` to only log specific methods,
or `only=lambda x: x.startswith('_')` to only log private methods.
exclude: Optionally specify `exclude=['dont_care']` to skip specific methods.
or `exclude=lambda x: x.startswith('_')` to only log public methods.
"""
def cyan(s):
return f'\x1b[0m\x1b[2;3;36m{s}\x1b[0m'
def decorator(cls_or_fn: Callable):
def wrap(_method: Callable, _isstatic=False) -> Callable:
@functools.wraps(_method)
def inner(*args, **kwargs):
global last_log_ts
now = int(time.time())
if now - last_log_ts > 60:
last_log_ts = now
print(f'\x1b[2m?? {datetime.fromtimestamp(now).isoformat(" ")} ??\x1b[0m', file=sys.stderr)
try:
pretty_sig = pretty_signature(_method, *args, **kwargs)
short_pretty_sig = shorten(pretty_sig, TERMWIDTH - 25)
fn_qualname, open_parenthesis, inside_parenthesis = short_pretty_sig.partition('(')
short_pretty_sig = f"\x1b[97;48;2;30;30;30m{fn_qualname}\x1b[0m({inside_parenthesis}"
if short:
calling = '\n' + cyan('Calling: ') + ('\n' + ' ' * 6).join(short_pretty_sig.splitlines()) + '\x1b[0m'
else:
calling = '\n' + cyan('Calling: ') + ('\n' + ' ' * 6).join(pretty_sig.splitlines()) + '\x1b[0m'
if calling.count('\n') > 15:
calling += f'\t\x1b[30m# {_method.__name__}(...)\x1b[0m'
calling += '\n'
print(calling, file=sys.stderr)
if _isstatic and hasattr(args[0], _method.__name__):
rv = _method(*args[1:], **kwargs)
else:
rv = _method(*args, **kwargs)
print(f"\n {cyan('� Returning from:')} " + ('\n' + ' ' * 17).join(short_pretty_sig.splitlines()) + "\n "
f"{markup_repr('[black]??[/black]')}{cyan('-> ')}{markup_repr(rv)}\n",
file=sys.stderr)
return rv
except Exception as e:
con.print_exception(max_frames=2,show_locals=True, extra_lines=3, width=TERMWIDTH)
print(markup_repr(e), file=sys.stderr)
return _method(*args, **kwargs)
finally:
last_log_ts = int(time.time())
return inner
if inspect.isfunction(cls_or_fn):
# todo: check if static method
wrap = functools.wraps(cls_or_fn)(wrap)
return wrap(cls_or_fn)
if only:
if inspect.isfunction(only):
condition = only
else:
condition = lambda x: x in only
elif exclude:
if inspect.isfunction(exclude):
condition = lambda x: not exclude(x)
else:
condition = lambda x: x not in exclude
else:
condition = lambda x: True
# inspect.ismethod is good only if the object is an instance, not a class.
# methods = {attrname: attr for attrname, attr in vars(cls_or_fn).items()
# if inspect.isfunction(attr) and condition(attrname)}
methods = {attrname: attr for attrname, attr in inspect.getmembers(cls_or_fn)
# if ((inspect.isroutine(attr) or isinstance(attr, types.MethodWrapperType))
# and '__getattribute__' not in attrname)
if inspect.isfunction(attr) # isroutine includes all magics including __getattr__, __new__ etc
and condition(attrname)}
for methodname, method in methods.items():
isstatic = isinstance(inspect.getattr_static(cls_or_fn, methodname), staticmethod)
wrapped = functools.wraps(method)(wrap)(method, isstatic)
setattr(cls_or_fn, methodname, wrapped)
return cls_or_fn
if maybe_class_or_fn:
return decorator(maybe_class_or_fn)
return decorator
from __future__ import annotations
import logging
from contextlib import contextmanager
from typing_extensions import Self
class ContextualLogger(logging.LoggerAdapter):
"""
Persists `extra` across logging calls.
`extra` can be specified as a kwarg to any logging call, e.g. `logger.info("message", chat_id=123)`
"""
extra: dict
def __init__(self, logger_or_name=logging.Logger | str, extra: dict = None):
if isinstance(logger_or_name, str):
logger = logging.getLogger(logger_or_name)
else:
logger = logger_or_name
extra = extra or {}
super().__init__(logger, extra)
def process(self, msg, kwargs):
"""Overwrites and persists new values to existing keys in self.extra."""
log_function_kwargs = {}
for log_function_kwarg in ("exc_info", "stack_info", "stacklevel"):
if log_function_kwarg in kwargs:
log_function_kwargs[log_function_kwarg] = kwargs.pop(log_function_kwarg)
explicit_extra = kwargs.pop('extra', {})
new_extra = kwargs | explicit_extra
for k, v in new_extra.items():
if k in self.extra:
self.extra[k] = v
return msg, {"extra": self.extra | new_extra, **log_function_kwargs}
def update_extra(self, **kwargs) -> Self:
"""
>>> logger = ContextualLogger("example")
>>> logger.update_extra(chat_id="...").info("hello")
"""
if self.extra is None:
self.extra = kwargs
else:
self.extra.update(kwargs)
return self
def delete_extra(self, *keys) -> Self:
for key in keys:
self.extra.pop(key, None)
return self
@contextmanager
def scope(self, **kwargs) -> Self:
"""
Persist `extra` only in a given scope.
>>> logger = ContextualLogger("example")
>>> with logger.scope(doing="something"):
... something()
... logger.info("done")
{ 'message': 'done', 'doing': 'something' }
"""
self.update_extra(**kwargs)
try:
yield self
finally:
self.delete_extra(*kwargs.keys())
__call__ = scope
import logging
from collections.abc import Callable
from time import perf_counter
from .contextual_logger import ContextualLogger
class PerformanceLogger(ContextualLogger):
def timeit(
self,
function: Callable = None,
*,
level: str | int = logging.INFO,
time_exceptions: bool = False,
description: str = None,
mimimum_seconds_threshold: float = 0.05,
):
if isinstance(level, str):
level = logging.getLevelName(level.upper())
def decorator(func: Callable):
def wrapper(*args, **kwargs):
start = perf_counter()
try:
result = func(*args, **kwargs)
elapsed_seconds = round(perf_counter() - start, 3)
if elapsed_seconds >= mimimum_seconds_threshold:
self._log_time(level, func, elapsed_seconds)
return result
except Exception as e:
elapsed_seconds = round(perf_counter() - start, 3)
if time_exceptions and elapsed_seconds >= mimimum_seconds_threshold:
description_with_error = f"{description or func.__qualname__} ({type(e).__name__}: {e})"
self._log_time(level, func, elapsed_seconds, description_with_error)
raise
return wrapper
if function:
return decorator(function)
return decorator
def _log_time(self, level: str | int, func: Callable, elapsed_seconds: float, description: str = None):
if not description:
description = func.__qualname__
self.log(
level,
f"{description} took {elapsed_seconds:.3f}s",
extra=dict(
performance={
'measured': func.__qualname__,
'elapsed_seconds': elapsed_seconds,
'description': description,
}
),
stacklevel=4,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment