Last active
July 22, 2025 05:00
-
-
Save adhadse/dafb2180ca85cf96c7d81f04795b88f3 to your computer and use it in GitHub Desktop.
Structured logging with Loguru
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sys | |
import traceback | |
import os | |
import re | |
import asyncio | |
import inspect | |
import functools | |
import loguru | |
from loguru import logger | |
from loguru._better_exceptions import ExceptionFormatter | |
PIPELINE_NAME = "YOUR PROJECT" | |
SERIALIZED_LOGGING = True | |
def colorize_json_string(json_str): | |
""" | |
Apply colorization to a JSON string after it's been serialized. | |
Colorize message based on the color of the level. | |
""" | |
# Define color codes | |
reset = "\033[0m" | |
green = "\033[32m" # For timestamp and success level | |
cyan = "\033[36m" # For DEBUG level and paths | |
white = "\033[0m" # For INFO level | |
yellow = "\033[33m" # For WARNING level | |
red = "\033[31m" # For ERROR level | |
magenta = "\033[35m" # For CRITICAL level | |
white_on_red = "\033[37;41m" # For CRITICAL level | |
# Find and colorize the timestamp | |
json_str = re.sub(r'("time": ")([^"]+)(")', | |
rf'\1{green}\2{reset}\3', json_str) | |
# Extract the level before colorizing to determine message color | |
level_match = re.search(r'"level": "([^"]+)"', json_str) | |
level_color = white # Default color | |
if level_match: | |
level = level_match.group(1) | |
if level == "DEBUG": | |
level_color = cyan | |
elif level == "INFO": | |
level_color = white | |
elif level == "WARNING": | |
level_color = yellow | |
elif level == "ERROR": | |
level_color = red | |
elif level == "SUCCESS": | |
level_color = green | |
elif level == "CRITICAL": | |
level_color = white_on_red | |
# Find and colorize the log level | |
json_str = re.sub(r'("level": ")DEBUG(")', rf'\1{cyan}DEBUG{reset}\2', json_str) | |
json_str = re.sub(r'("level": ")INFO(")', rf'\1{white}INFO{reset}\2', json_str) | |
json_str = re.sub(r'("level": ")WARNING(")', rf'\1{yellow}WARNING{reset}\2', json_str) | |
json_str = re.sub(r'("level": ")ERROR(")', rf'\1{red}ERROR{reset}\2', json_str) | |
json_str = re.sub(r'("level": ")SUCCESS(")', rf'\1{green}SUCCESS{reset}\2', json_str) | |
json_str = re.sub(r'("level": ")CRITICAL(")', rf'\1{white_on_red}CRITICAL{reset}\2', json_str) | |
# Find and colorize the message using the level color | |
json_str = re.sub(r'("message": ")(.*?)(")', rf'\1{level_color}\2{reset}\3', json_str) | |
# Find and colorize the path | |
json_str = re.sub(r'("path": ")(.*?)(")', rf'\1{cyan}\2{reset}\3', json_str) | |
# Find and colorize exceptions | |
json_str = re.sub(r'("type": ")(.*?)(")', rf'\1{red}\2{reset}\3', json_str) | |
json_str = re.sub(r'("value": ")(.*?)(")', rf'\1{red}\2{reset}\3', json_str) | |
return json_str | |
def serialize(record): | |
"""Serialize with datetime, path info, and apply colorization to the JSON string.""" | |
# Extract datetime | |
timestamp = record["time"].isoformat(timespec='milliseconds') | |
# Extract file path, module, function and line info | |
file_path = record["file"].path | |
module_name = record["name"] | |
function_name = record["function"] | |
line_number = record["line"] | |
# Special handling for Jupyter notebooks | |
if module_name.isdigit() or "ipython-input" in str(file_path).lower(): | |
# Check if we're in a Jupyter notebook | |
try: | |
# Try to get the notebook name if possible | |
import IPython | |
notebook_path = IPython.get_ipython().kernel.session.config.get('IPKernelApp', {}).get('connection_file', '') | |
if notebook_path: | |
notebook_name = os.path.basename(notebook_path).split('.', 1)[0] | |
module_name = f"jupyter.{notebook_name}" | |
else: | |
module_name = "__main__" | |
except (ImportError, AttributeError): | |
module_name = "__main__" # Fallback name for Jupyter environments | |
path_info = f"{module_name}:{function_name}:{line_number}" | |
# Get log level | |
level = record["level"].name | |
# Extract other info | |
error: loguru.RecordException = record["exception"] | |
error_by_default = sys.exc_info() # logger.error | |
pipeline: str | None = record["extra"].get("pipeline", None) | |
show_exception_value: bool = record["extra"].get("show_exception_value", True) | |
extra = record["extra"].copy() | |
extra.update({"pipeline": pipeline}) | |
# Process exception info | |
if error: # only set when exception. | |
exc_type, exc_value, exc_tb = error.type, error.value, error.traceback | |
# Use ExceptionFormatter directly with the specific error components | |
formatter = ExceptionFormatter(backtrace=True, diagnose=True, colorize=True) | |
formatted_traceback = formatter.format_exception(exc_type, exc_value, exc_tb) | |
exception = { | |
"type": exc_type.__name__, | |
"value": str(exc_value).strip("'") if show_exception_value else None, | |
"traceback": "".join(formatted_traceback), | |
} | |
elif error_by_default[0]: # whenever error occurs | |
_type, _value, _ = sys.exc_info() | |
exception = { | |
"type": _type.__name__, | |
"value": str(_value).strip("'") if show_exception_value else None, | |
"traceback": None, | |
} | |
else: | |
exception = None | |
# Prepare data for serialization | |
to_serialize = { | |
"time": timestamp, | |
"level": level, | |
"path": path_info, | |
"message": record["message"], | |
"pipeline": pipeline, | |
"exception": exception, | |
} | |
# Add other extra fields | |
for key, value in extra.items(): | |
if key not in ("pipeline", "serialized", "show_exception_value"): | |
to_serialize[key] = value | |
# Convert to JSON string | |
json_str = json.dumps(to_serialize) | |
# Colorize the JSON string | |
return colorize_json_string(json_str) | |
def patching(record): | |
"""Patch the logger.""" | |
record["extra"]["serialized"] = serialize(record) | |
def get_contextualized_logger( | |
pipeline_name: str = PIPELINE_NAME, default_logger=logger | |
): | |
"""Generates a contextualized logger with pipeline_name.""" | |
if not SERIALIZED_LOGGING: # Replace with your SERIALIZED_LOGGING variable | |
return default_logger | |
default_logger.remove() | |
default_logger = default_logger.patch(patching) | |
default_logger.add( | |
sink=sys.stdout, | |
colorize=True, | |
serialize=False, # custom serialization requires this to be False | |
backtrace=True, | |
diagnose=True, | |
level="INFO", | |
format="{extra[serialized]}", | |
) | |
return default_logger.bind(pipeline="P1") | |
def logger_with_context(*param_specs, **fixed_context): | |
"""Decorator that automatically extracts context from specified function parameters. | |
Works with both synchronous and asynchronous functions. | |
Args: | |
*param_specs: Parameter specifications, which can be: | |
- Simple parameter names (e.g., "task") | |
- Attribute paths (e.g., "task.id" to get task.id) | |
- Dictionaries to include directly in context | |
**fixed_context: Additional fixed context values | |
Returns: | |
Decorated function with automatic context logging | |
""" | |
# Process param_specs to separate dictionaries from string specifications | |
processed_param_specs = [] | |
additional_context = {} | |
for spec in param_specs: | |
if isinstance(spec, dict): | |
# If a dictionary is passed as a positional argument, | |
# add it to additional context | |
additional_context.update(spec) | |
else: | |
# Otherwise, treat it as a parameter name or attribute path | |
processed_param_specs.append(spec) | |
# Merge fixed_context with any dictionaries from param_specs | |
merged_fixed_context = dict(additional_context) | |
merged_fixed_context.update(fixed_context) | |
def decorator(func): | |
# Check if the function is async or sync | |
is_async = asyncio.iscoroutinefunction(func) | |
def get_context_from_args(bound_args): | |
# Build context dictionary from parameters and fixed values | |
context = dict(merged_fixed_context) | |
# Process parameter specifications | |
for spec in processed_param_specs: | |
if "." in spec: | |
# Handle object attribute access (e.g., "task.id") | |
obj_name, attr_name = spec.split(".", 1) | |
if obj_name not in bound_args.arguments: | |
continue | |
obj = bound_args.arguments[obj_name] | |
if obj is None: | |
continue | |
# Try to get the attribute, gracefully handle missing attributes | |
try: | |
# Use the attribute name as the context key | |
if attr_name == "oid": | |
modified_attr_name = "id" | |
else: | |
modified_attr_name = attr_name | |
context[f"{obj_name}_{modified_attr_name}"] = getattr( | |
obj, attr_name | |
) | |
except AttributeError: | |
logger.warning(f"Object {obj_name} has no attribute {attr_name}") | |
else: | |
# Handle direct parameter access | |
if spec in bound_args.arguments: | |
param_value = bound_args.arguments[spec] | |
if isinstance(param_value, dict): | |
# If parameter is a dictionary, merge it into context | |
context.update(param_value) | |
else: | |
context[spec] = param_value | |
return context | |
@functools.wraps(func) | |
async def async_wrapper(*args, **kwargs): | |
# Get parameter values from function call | |
sig = inspect.signature(func) | |
bound_args = sig.bind(*args, **kwargs) | |
bound_args.apply_defaults() | |
# Get context from arguments | |
context = get_context_from_args(bound_args) | |
# Execute async function with context | |
with logger.contextualize(**context): | |
result = await func(*args, **kwargs) | |
return result | |
@functools.wraps(func) | |
def sync_wrapper(*args, **kwargs): | |
# Get parameter values from function call | |
sig = inspect.signature(func) | |
bound_args = sig.bind(*args, **kwargs) | |
bound_args.apply_defaults() | |
# Get context from arguments | |
context = get_context_from_args(bound_args) | |
# Execute sync function with context | |
with logger.contextualize(**context): | |
result = func(*args, **kwargs) | |
return result | |
# Return the appropriate wrapper based on the function type | |
return async_wrapper if is_async else sync_wrapper | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment