-
-
Save datavudeja/4e26026b8ae49ed0413335683518c04f to your computer and use it in GitHub Desktop.
python_logging.py: python logging snippets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import builtins | |
| from datetime import datetime | |
| import functools | |
| import inspect | |
| import logging | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from collections.abc import Callable | |
| from contextlib import suppress, contextmanager, redirect_stdout | |
| from functools import partialmethod | |
| from io import StringIO | |
| from logging.handlers import RotatingFileHandler | |
| from pprint import pprint | |
| from typing import TypeVar, Union | |
| # good for pyspark sql cols and dfs | |
| from varname import nameof | |
| import builtins | |
| import textwrap | |
| printcount=1 | |
| def print(obj, reper=None, container=None, comment=None): | |
| nonlocal printcount | |
| nameof_var = nameof(obj, frame=2) | |
| if reper: | |
| description = f"{nameof_var} = {reper}" | |
| else: | |
| description = nameof_var | |
| if comment: | |
| description += "\n# " + '\n# '.join(textwrap.wrap(comment, width=60)) | |
| builtins.print(f'{printcount}|{description}') | |
| if container: | |
| container.select(obj).show() | |
| elif callable(getattr(obj, 'show', None)): | |
| obj.show() | |
| else: | |
| builtins.print(obj, end='\n\n') | |
| printcount+=1 | |
| try: | |
| from loguru import logger as loguru_logger | |
| except ModuleNotFoundError: | |
| pass | |
| else: | |
| loguru_logger.__class__.title = partialmethod(loguru_logger.__class__.log, | |
| loguru_logger.level("title", | |
| no=25, | |
| color='<bold><white>').name) | |
| TERMWIDTH = None | |
| for fd in (sys.__stdin__.fileno(), sys.__stdout__.fileno(), sys.__stderr__.fileno()): | |
| try: | |
| TERMWIDTH = os.get_terminal_size(fd)[0] | |
| except (AttributeError, ValueError, OSError): | |
| pass | |
| else: | |
| break | |
| if not TERMWIDTH or TERMWIDTH <= 80: | |
| columns = os.environ.get("COLUMNS") | |
| if columns and columns.isdigit() and int(columns) > 80: | |
| TERMWIDTH = int(columns) | |
| if not TERMWIDTH or TERMWIDTH <= 80: | |
| TERMWIDTH = 170 | |
| OBJ_RE = re.compile(r'<(?:[\w\d<]+\.)*([\w\d>]+)[ object]* at (0x[\w\d]{12})>') | |
| TYPE_RE = re.compile(r"<class '(?:[\w\d<>]+\.)*([\w\d]+)'>") | |
| WHITESPACE_RE = re.compile(r'\s+') | |
| COLOR_RE = re.compile(r'(\x1b\[(?:\d;?)*m)') | |
| INTERACTIVE = hasattr(sys.stdin, "fileno") and os.isatty(sys.stdin.fileno()) | |
| RUN_BY_HUMAN = hasattr(sys.stderr, "isatty") and sys.stderr.isatty() | |
| try: | |
| import rich | |
| except ModuleNotFoundError: | |
| print('\n\x1b[97;1m[internal_log.py] Running: pip install rich\x1b[0m\n', file=sys.stderr) | |
| failed = bool(os.system('pip install --retries=2 rich')) | |
| if failed: | |
| msg = f'\n\x1b[91;1m[internal_log.py] Failed: pip install rich\x1b[0m\n' | |
| else: | |
| msg = f'\n\x1b[92;1m[internal_log.py] Success: pip install rich\x1b[0m\n' | |
| print(msg, file=sys.stderr) | |
| finally: | |
| try: | |
| import rich | |
| except ModuleNotFoundError: | |
| pass | |
| else: | |
| sys.modules['rich'] = rich | |
| if 'rich' in sys.modules: | |
| import rich | |
| from rich.console import Console | |
| from rich.theme import Theme | |
| PYCHARM_HOSTED = os.getenv('PYCHARM_HOSTED') | |
| theme = { | |
| 'debug': 'dim', | |
| 'warn': 'yellow', | |
| 'warning': 'yellow', | |
| 'error': 'red', | |
| 'fatal': 'bright_red', | |
| 'success': 'green', | |
| 'prompt': 'b bright_cyan', | |
| 'title': 'b bright_white', | |
| } | |
| con = Console(force_terminal=False, | |
| log_time_format='[%T %X]', | |
| color_system='auto' if PYCHARM_HOSTED else 'truecolor', | |
| width=TERMWIDTH, | |
| tab_size=2, | |
| file=sys.stdout if PYCHARM_HOSTED else sys.stderr, | |
| theme=Theme({**theme, **{k.upper(): v for k, v in theme.items()}})) | |
| else: | |
| # Failed importing rich, probably network issue | |
| class CaptureShim(StringIO): | |
| def get(self): | |
| return self.getvalue() | |
| class ConsoleShim: | |
| print = staticmethod(pprint) | |
| print_json = staticmethod(pprint) | |
| @contextmanager | |
| def capture(self): | |
| file = CaptureShim() | |
| with redirect_stdout(file): | |
| try: | |
| yield file | |
| except Exception: | |
| pass | |
| con = ConsoleShim() | |
| builtins.con = con | |
| def decolor(s): | |
| return COLOR_RE.sub('', s) | |
| def shorten(s, limit=TERMWIDTH): | |
| if not s: | |
| return s | |
| if limit < 4: | |
| logging.warning(f"shorten({shorten(repr(s), limit=20)}) was called with limit = %d, can handle limit >= 4", limit) | |
| return s | |
| length = len(s) | |
| if length <= limit: | |
| return s | |
| half_the_limit = limit // 2 | |
| if '\x1b[' in s: | |
| no_color = decolor(s) | |
| real_length = len(no_color) | |
| if real_length <= limit: | |
| return s | |
| color_matches: list[re.Match] = list(COLOR_RE.finditer(s)) | |
| if len(color_matches) == 2: | |
| # Only 2 color tags (open and close) | |
| color_a, color_b = color_matches | |
| if color_a.start() == 0 and color_b.end() == length: | |
| # Colors surround string from both ends | |
| return f'{color_a.group()}{shorten(no_color, limit)}{color_b.group()}' | |
| return shorten(no_color, limit) | |
| # escape_seq_start_rindex = s.rindex('\x1b') | |
| # left_cutoff = max(escape_seq_start_index + 4, half_the_limit) | |
| # right_cutoff = min((real_length - escape_seq_start_rindex) + 4, half_the_limit) | |
| # print(f'{limit = } | {length = } | {real_length = } | {left_cutoff = } | {right_cutoff = } | {half_the_limit = } | {escape_seq_start_index = } | {escape_seq_start_rindex = }') | |
| left_cutoff = max(half_the_limit - 3, 1) | |
| right_cutoff = max(half_the_limit - 4, 1) | |
| # print(f'{limit = } | {length = } | {left_cutoff = } | {right_cutoff = } | {half_the_limit = }') | |
| free_chars = limit - left_cutoff - right_cutoff | |
| assert free_chars > 0, f'{free_chars = } not > 0' | |
| beginning = s[:left_cutoff] | |
| end = s[-right_cutoff:] | |
| if free_chars >= 7: | |
| separator = ' [...] ' | |
| elif free_chars >= 5: | |
| separator = '[...]' | |
| elif free_chars >= 4: | |
| separator = ' .. ' | |
| else: | |
| separator = '.' * free_chars | |
| assert len(separator) <= free_chars, f'{len(separator) = } ! <= {free_chars = }' | |
| return WHITESPACE_RE.sub(' ', f'{beginning}{separator}{end}') | |
| class Reprer: | |
| def __init__(self, printer: Callable = None): | |
| if not printer: | |
| printer = functools.partial(con.print, soft_wrap=True) | |
| self._print = printer | |
| self._repr_fns = { | |
| lambda obj: hasattr(obj, 'url_root'): lambda obj: printer(obj, f' url_root = {obj.url_root}'), | |
| # lambda obj: hasattr(obj, 'payload'): lambda obj: printer(obj, f' payload = {obj.payload}'), | |
| } | |
| def repr(self, obj): | |
| for condition, print_fn in self._repr_fns.items(): | |
| if condition(obj): | |
| print_fn(obj) | |
| return | |
| self._print(pretty_inst(obj)) | |
| reprer = Reprer() | |
| def markup_repr(obj) -> str: | |
| with con.capture() as captured: | |
| if isinstance(obj, (dict, list)): | |
| from rich.json import JSON | |
| try: | |
| pretty = JSON.from_data(obj, indent=4, sort_keys=True) | |
| con.print(pretty) | |
| except TypeError: | |
| # con.print(obj) # use reprer? | |
| reprer.repr(obj) | |
| elif isinstance(obj, str) and obj.startswith("{\""): | |
| con.print_json(obj, indent=4, sort_keys=True) | |
| elif isinstance(obj, BaseException): | |
| con.print(f'{obj.__class__.__qualname__}({", ".join(map(repr, obj.args))})') # use reprer? | |
| else: | |
| reprer.repr(obj) | |
| return captured.get().rstrip() | |
| T = TypeVar('T') | |
| def pretty_inst(obj: T) -> Union[str, T]: | |
| """<foo.bar.Person object at 0x7f2137d027c0> -> Person (0x7f2137d027c0)""" | |
| if isinstance(obj, str): | |
| # In case obj doesn't match "<foo> at 0x123", | |
| # just return as-is without additional str(...). | |
| if not OBJ_RE.search(obj): | |
| return obj | |
| string = obj | |
| else: | |
| # In case str(obj) doesn't match "<foo> at 0x123", | |
| # just return as-is without converting to string. | |
| match = OBJ_RE.search(str(obj)) | |
| if not match: | |
| return obj | |
| string = str(obj) | |
| return OBJ_RE.sub(lambda _match: f'{(groups := _match.groups())[0]} ({groups[1]})', string) | |
| def pretty_type(obj) -> str: # unused? | |
| """pretty_type({"hi" : "bye"}) -> dict (1)""" | |
| stringified_type: str | |
| if isinstance(obj, str): | |
| return 'str' | |
| elif type(obj) is type: | |
| stringified_type = str(obj) | |
| else: | |
| stringified_type = str(type(obj)) | |
| rv = TYPE_RE.sub(lambda match: match.groups()[0], stringified_type) | |
| with suppress(TypeError): | |
| rv += f' ({len(obj)})' | |
| return rv | |
| def pretty_signature(method, *args, **kwargs) -> str: | |
| pretty_sig = "\x1b[97;48;2;30;30;30m" if RUN_BY_HUMAN else "" | |
| if hasattr(method, '__name__'): # because __qualname__ can be e.g 'AccountServiceCallback.__init__' | |
| method_name = method.__name__ | |
| elif hasattr(method, '__qualname__'): | |
| method_name = method.__qualname__ | |
| else: | |
| method_name = str(method) | |
| left_offset = len(method_name) | |
| instance_name = None | |
| if args: | |
| first_arg, *rest = args | |
| # if 'AccountServiceCallback' in first_arg.__class__.__name__ and '__init__' in method_name: | |
| # print(f'{args[0] = }', f'{first_arg = }', f'{method_name = }', f'{hasattr(first_arg, method_name) = }', sep='\n') | |
| # If 'some_method(foo, *args, **kwargs)', or 'some_method(Foo, *args, **kwargs)', | |
| # Display it as 'Foo.some_method(*args, **kwargs)'. | |
| if hasattr(first_arg, method_name): # __qualname__ can include class, e.g 'AccountServiceCallback.__init__' | |
| obj = first_arg | |
| args = rest | |
| # if some_method(Class, *args, **kwargs), then `Class` arg is a class | |
| if type(obj) is type: | |
| instance_name = pretty_inst(obj.__qualname__) | |
| else: | |
| instance_name = pretty_inst(obj.__class__.__qualname__) | |
| pretty_sig += f'{instance_name}.' | |
| left_offset += len(instance_name) + 1 | |
| if '.' not in method_name and not instance_name: | |
| # 'put' -> 'platforms_conf.put' | |
| pretty_sig += f'{method.__module__}.' | |
| left_offset += len(method.__module__) + 1 | |
| # pretty_args = [] | |
| # args_len = len(args) | |
| # for i, arg in enumerate(args): | |
| # # Relies on pretty_inst returning as-is if obj doesn't match "<foo> at 0x123". | |
| # pretty_arg = markup_repr(pretty_inst(repr(arg))) | |
| # if i < args_len - 2: | |
| # pretty_arg += '\x1b[33m,\x1b[0m' | |
| # if len(pretty_arg) >= TERMWIDTH: | |
| # pretty_arg += '\n' + ' ' * (len(method_name) + 4) | |
| # else: | |
| # pretty_arg += ' ' | |
| # pretty_args.append(pretty_arg) | |
| # pretty_args_joined = ''.join(pretty_args) | |
| left_offset = ' ' * (left_offset + 4) | |
| pretty_args = list(map(markup_repr, map(pretty_inst, map(repr, args)))) | |
| if False and sum(map(len, pretty_args)) >= TERMWIDTH: | |
| pretty_args_joined = ('\x1b[1;33m,\x1b[0m\n' + left_offset).join(pretty_args) | |
| else: | |
| pretty_args_joined = '\x1b[1;33m,\x1b[0m '.join(pretty_args) | |
| # pretty_args_joined = ", ".join(map(markup_repr, args)) | |
| pretty_kwargs = [] | |
| for k, v in kwargs.items(): | |
| # pretty_kwargs.append(f'{k}={markup_repr(v)}') | |
| pretty_kwargs.append(f'\x1b[33m{k}\x1b[0m={markup_repr(pretty_inst(repr(v)))}') | |
| pretty_kwargs_joined = f",\n{left_offset}".join(pretty_kwargs) | |
| pretty_sig += f'{method_name}\x1b[0m(' + pretty_args_joined + (', ' if args and kwargs else '') + pretty_kwargs_joined + ')' | |
| return pretty_sig | |
| global last_log_ts | |
| last_log_ts = 0 | |
| def log_method_calls(maybe_class_or_fn: Callable | type = None, | |
| *, | |
| only: tuple[str, ...] | Callable[[str], bool] = (), | |
| exclude: tuple[str, ...] | Callable[[str], bool] = (), | |
| short=False, | |
| ) -> Callable: | |
| """ | |
| A class or function decorator, logs when a method is called, and when it returns (with args and return values). | |
| Examples: | |
| # Ex. 1 | |
| @log_method_calls | |
| class Calculator: | |
| def add(self, a, b): return a+b | |
| # Ex. 2 | |
| @log_method_call(only=['add']) | |
| class ProCalculator: | |
| def add(self, a, b): return a + b | |
| def divide(self, a, b): return a / b | |
| # Ex. 3 | |
| @log_method_calls | |
| def say_hello(name): print(f'Hello, {name}!') | |
| Args: | |
| only: Optionally specify `only=['some_method', 'other_method']` to only log specific methods, | |
| or `only=lambda x: x.startswith('_')` to only log private methods. | |
| exclude: Optionally specify `exclude=['dont_care']` to skip specific methods. | |
| or `exclude=lambda x: x.startswith('_')` to only log public methods. | |
| """ | |
| def cyan(s): | |
| return f'\x1b[0m\x1b[2;3;36m{s}\x1b[0m' | |
| def decorator(cls_or_fn: Callable): | |
| def wrap(_method: Callable, _isstatic=False) -> Callable: | |
| @functools.wraps(_method) | |
| def inner(*args, **kwargs): | |
| global last_log_ts | |
| now = int(time.time()) | |
| if now - last_log_ts > 60: | |
| last_log_ts = now | |
| print(f'\x1b[2m?? {datetime.fromtimestamp(now).isoformat(" ")} ??\x1b[0m', file=sys.stderr) | |
| try: | |
| pretty_sig = pretty_signature(_method, *args, **kwargs) | |
| short_pretty_sig = shorten(pretty_sig, TERMWIDTH - 25) | |
| fn_qualname, open_parenthesis, inside_parenthesis = short_pretty_sig.partition('(') | |
| short_pretty_sig = f"\x1b[97;48;2;30;30;30m{fn_qualname}\x1b[0m({inside_parenthesis}" | |
| if short: | |
| calling = '\n' + cyan('Calling: ') + ('\n' + ' ' * 6).join(short_pretty_sig.splitlines()) + '\x1b[0m' | |
| else: | |
| calling = '\n' + cyan('Calling: ') + ('\n' + ' ' * 6).join(pretty_sig.splitlines()) + '\x1b[0m' | |
| if calling.count('\n') > 15: | |
| calling += f'\t\x1b[30m# {_method.__name__}(...)\x1b[0m' | |
| calling += '\n' | |
| print(calling, file=sys.stderr) | |
| if _isstatic and hasattr(args[0], _method.__name__): | |
| rv = _method(*args[1:], **kwargs) | |
| else: | |
| rv = _method(*args, **kwargs) | |
| print(f"\n {cyan('� Returning from:')} " + ('\n' + ' ' * 17).join(short_pretty_sig.splitlines()) + "\n " | |
| f"{markup_repr('[black]??[/black]')}{cyan('-> ')}{markup_repr(rv)}\n", | |
| file=sys.stderr) | |
| return rv | |
| except Exception as e: | |
| con.print_exception(max_frames=2,show_locals=True, extra_lines=3, width=TERMWIDTH) | |
| print(markup_repr(e), file=sys.stderr) | |
| return _method(*args, **kwargs) | |
| finally: | |
| last_log_ts = int(time.time()) | |
| return inner | |
| if inspect.isfunction(cls_or_fn): | |
| # todo: check if static method | |
| wrap = functools.wraps(cls_or_fn)(wrap) | |
| return wrap(cls_or_fn) | |
| if only: | |
| if inspect.isfunction(only): | |
| condition = only | |
| else: | |
| condition = lambda x: x in only | |
| elif exclude: | |
| if inspect.isfunction(exclude): | |
| condition = lambda x: not exclude(x) | |
| else: | |
| condition = lambda x: x not in exclude | |
| else: | |
| condition = lambda x: True | |
| # inspect.ismethod is good only if the object is an instance, not a class. | |
| # methods = {attrname: attr for attrname, attr in vars(cls_or_fn).items() | |
| # if inspect.isfunction(attr) and condition(attrname)} | |
| methods = {attrname: attr for attrname, attr in inspect.getmembers(cls_or_fn) | |
| # if ((inspect.isroutine(attr) or isinstance(attr, types.MethodWrapperType)) | |
| # and '__getattribute__' not in attrname) | |
| if inspect.isfunction(attr) # isroutine includes all magics including __getattr__, __new__ etc | |
| and condition(attrname)} | |
| for methodname, method in methods.items(): | |
| isstatic = isinstance(inspect.getattr_static(cls_or_fn, methodname), staticmethod) | |
| wrapped = functools.wraps(method)(wrap)(method, isstatic) | |
| setattr(cls_or_fn, methodname, wrapped) | |
| return cls_or_fn | |
| if maybe_class_or_fn: | |
| return decorator(maybe_class_or_fn) | |
| return decorator | |
| from __future__ import annotations | |
| import logging | |
| from contextlib import contextmanager | |
| from typing_extensions import Self | |
| class ContextualLogger(logging.LoggerAdapter): | |
| """ | |
| Persists `extra` across logging calls. | |
| `extra` can be specified as a kwarg to any logging call, e.g. `logger.info("message", chat_id=123)` | |
| """ | |
| extra: dict | |
| def __init__(self, logger_or_name=logging.Logger | str, extra: dict = None): | |
| if isinstance(logger_or_name, str): | |
| logger = logging.getLogger(logger_or_name) | |
| else: | |
| logger = logger_or_name | |
| extra = extra or {} | |
| super().__init__(logger, extra) | |
| def process(self, msg, kwargs): | |
| """Overwrites and persists new values to existing keys in self.extra.""" | |
| log_function_kwargs = {} | |
| for log_function_kwarg in ("exc_info", "stack_info", "stacklevel"): | |
| if log_function_kwarg in kwargs: | |
| log_function_kwargs[log_function_kwarg] = kwargs.pop(log_function_kwarg) | |
| explicit_extra = kwargs.pop('extra', {}) | |
| new_extra = kwargs | explicit_extra | |
| for k, v in new_extra.items(): | |
| if k in self.extra: | |
| self.extra[k] = v | |
| return msg, {"extra": self.extra | new_extra, **log_function_kwargs} | |
| def update_extra(self, **kwargs) -> Self: | |
| """ | |
| >>> logger = ContextualLogger("example") | |
| >>> logger.update_extra(chat_id="...").info("hello") | |
| """ | |
| if self.extra is None: | |
| self.extra = kwargs | |
| else: | |
| self.extra.update(kwargs) | |
| return self | |
| def delete_extra(self, *keys) -> Self: | |
| for key in keys: | |
| self.extra.pop(key, None) | |
| return self | |
| @contextmanager | |
| def scope(self, **kwargs) -> Self: | |
| """ | |
| Persist `extra` only in a given scope. | |
| >>> logger = ContextualLogger("example") | |
| >>> with logger.scope(doing="something"): | |
| ... something() | |
| ... logger.info("done") | |
| { 'message': 'done', 'doing': 'something' } | |
| """ | |
| self.update_extra(**kwargs) | |
| try: | |
| yield self | |
| finally: | |
| self.delete_extra(*kwargs.keys()) | |
| __call__ = scope | |
| import logging | |
| from collections.abc import Callable | |
| from time import perf_counter | |
| from .contextual_logger import ContextualLogger | |
| class PerformanceLogger(ContextualLogger): | |
| def timeit( | |
| self, | |
| function: Callable = None, | |
| *, | |
| level: str | int = logging.INFO, | |
| time_exceptions: bool = False, | |
| description: str = None, | |
| mimimum_seconds_threshold: float = 0.05, | |
| ): | |
| if isinstance(level, str): | |
| level = logging.getLevelName(level.upper()) | |
| def decorator(func: Callable): | |
| def wrapper(*args, **kwargs): | |
| start = perf_counter() | |
| try: | |
| result = func(*args, **kwargs) | |
| elapsed_seconds = round(perf_counter() - start, 3) | |
| if elapsed_seconds >= mimimum_seconds_threshold: | |
| self._log_time(level, func, elapsed_seconds) | |
| return result | |
| except Exception as e: | |
| elapsed_seconds = round(perf_counter() - start, 3) | |
| if time_exceptions and elapsed_seconds >= mimimum_seconds_threshold: | |
| description_with_error = f"{description or func.__qualname__} ({type(e).__name__}: {e})" | |
| self._log_time(level, func, elapsed_seconds, description_with_error) | |
| raise | |
| return wrapper | |
| if function: | |
| return decorator(function) | |
| return decorator | |
| def _log_time(self, level: str | int, func: Callable, elapsed_seconds: float, description: str = None): | |
| if not description: | |
| description = func.__qualname__ | |
| self.log( | |
| level, | |
| f"{description} took {elapsed_seconds:.3f}s", | |
| extra=dict( | |
| performance={ | |
| 'measured': func.__qualname__, | |
| 'elapsed_seconds': elapsed_seconds, | |
| 'description': description, | |
| } | |
| ), | |
| stacklevel=4, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment