Skip to content

Instantly share code, notes, and snippets.

@adhadse
Last active July 22, 2025 05:00
Show Gist options
  • Save adhadse/dafb2180ca85cf96c7d81f04795b88f3 to your computer and use it in GitHub Desktop.
Save adhadse/dafb2180ca85cf96c7d81f04795b88f3 to your computer and use it in GitHub Desktop.
Structured logging with Loguru
import json
import sys
import traceback
import os
import re
import asyncio
import inspect
import functools
import loguru
from loguru import logger
from loguru._better_exceptions import ExceptionFormatter
PIPELINE_NAME = "YOUR PROJECT"
SERIALIZED_LOGGING = True
def colorize_json_string(json_str):
"""
Apply colorization to a JSON string after it's been serialized.
Colorize message based on the color of the level.
"""
# Define color codes
reset = "\033[0m"
green = "\033[32m" # For timestamp and success level
cyan = "\033[36m" # For DEBUG level and paths
white = "\033[0m" # For INFO level
yellow = "\033[33m" # For WARNING level
red = "\033[31m" # For ERROR level
magenta = "\033[35m" # For CRITICAL level
white_on_red = "\033[37;41m" # For CRITICAL level
# Find and colorize the timestamp
json_str = re.sub(r'("time": ")([^"]+)(")',
rf'\1{green}\2{reset}\3', json_str)
# Extract the level before colorizing to determine message color
level_match = re.search(r'"level": "([^"]+)"', json_str)
level_color = white # Default color
if level_match:
level = level_match.group(1)
if level == "DEBUG":
level_color = cyan
elif level == "INFO":
level_color = white
elif level == "WARNING":
level_color = yellow
elif level == "ERROR":
level_color = red
elif level == "SUCCESS":
level_color = green
elif level == "CRITICAL":
level_color = white_on_red
# Find and colorize the log level
json_str = re.sub(r'("level": ")DEBUG(")', rf'\1{cyan}DEBUG{reset}\2', json_str)
json_str = re.sub(r'("level": ")INFO(")', rf'\1{white}INFO{reset}\2', json_str)
json_str = re.sub(r'("level": ")WARNING(")', rf'\1{yellow}WARNING{reset}\2', json_str)
json_str = re.sub(r'("level": ")ERROR(")', rf'\1{red}ERROR{reset}\2', json_str)
json_str = re.sub(r'("level": ")SUCCESS(")', rf'\1{green}SUCCESS{reset}\2', json_str)
json_str = re.sub(r'("level": ")CRITICAL(")', rf'\1{white_on_red}CRITICAL{reset}\2', json_str)
# Find and colorize the message using the level color
json_str = re.sub(r'("message": ")(.*?)(")', rf'\1{level_color}\2{reset}\3', json_str)
# Find and colorize the path
json_str = re.sub(r'("path": ")(.*?)(")', rf'\1{cyan}\2{reset}\3', json_str)
# Find and colorize exceptions
json_str = re.sub(r'("type": ")(.*?)(")', rf'\1{red}\2{reset}\3', json_str)
json_str = re.sub(r'("value": ")(.*?)(")', rf'\1{red}\2{reset}\3', json_str)
return json_str
def serialize(record):
"""Serialize with datetime, path info, and apply colorization to the JSON string."""
# Extract datetime
timestamp = record["time"].isoformat(timespec='milliseconds')
# Extract file path, module, function and line info
file_path = record["file"].path
module_name = record["name"]
function_name = record["function"]
line_number = record["line"]
# Special handling for Jupyter notebooks
if module_name.isdigit() or "ipython-input" in str(file_path).lower():
# Check if we're in a Jupyter notebook
try:
# Try to get the notebook name if possible
import IPython
notebook_path = IPython.get_ipython().kernel.session.config.get('IPKernelApp', {}).get('connection_file', '')
if notebook_path:
notebook_name = os.path.basename(notebook_path).split('.', 1)[0]
module_name = f"jupyter.{notebook_name}"
else:
module_name = "__main__"
except (ImportError, AttributeError):
module_name = "__main__" # Fallback name for Jupyter environments
path_info = f"{module_name}:{function_name}:{line_number}"
# Get log level
level = record["level"].name
# Extract other info
error: loguru.RecordException = record["exception"]
error_by_default = sys.exc_info() # logger.error
pipeline: str | None = record["extra"].get("pipeline", None)
show_exception_value: bool = record["extra"].get("show_exception_value", True)
extra = record["extra"].copy()
extra.update({"pipeline": pipeline})
# Process exception info
if error: # only set when exception.
exc_type, exc_value, exc_tb = error.type, error.value, error.traceback
# Use ExceptionFormatter directly with the specific error components
formatter = ExceptionFormatter(backtrace=True, diagnose=True, colorize=True)
formatted_traceback = formatter.format_exception(exc_type, exc_value, exc_tb)
exception = {
"type": exc_type.__name__,
"value": str(exc_value).strip("'") if show_exception_value else None,
"traceback": "".join(formatted_traceback),
}
elif error_by_default[0]: # whenever error occurs
_type, _value, _ = sys.exc_info()
exception = {
"type": _type.__name__,
"value": str(_value).strip("'") if show_exception_value else None,
"traceback": None,
}
else:
exception = None
# Prepare data for serialization
to_serialize = {
"time": timestamp,
"level": level,
"path": path_info,
"message": record["message"],
"pipeline": pipeline,
"exception": exception,
}
# Add other extra fields
for key, value in extra.items():
if key not in ("pipeline", "serialized", "show_exception_value"):
to_serialize[key] = value
# Convert to JSON string
json_str = json.dumps(to_serialize)
# Colorize the JSON string
return colorize_json_string(json_str)
def patching(record):
"""Patch the logger."""
record["extra"]["serialized"] = serialize(record)
def get_contextualized_logger(
pipeline_name: str = PIPELINE_NAME, default_logger=logger
):
"""Generates a contextualized logger with pipeline_name."""
if not SERIALIZED_LOGGING: # Replace with your SERIALIZED_LOGGING variable
return default_logger
default_logger.remove()
default_logger = default_logger.patch(patching)
default_logger.add(
sink=sys.stdout,
colorize=True,
serialize=False, # custom serialization requires this to be False
backtrace=True,
diagnose=True,
level="INFO",
format="{extra[serialized]}",
)
return default_logger.bind(pipeline="P1")
def logger_with_context(*param_specs, **fixed_context):
"""Decorator that automatically extracts context from specified function parameters.
Works with both synchronous and asynchronous functions.
Args:
*param_specs: Parameter specifications, which can be:
- Simple parameter names (e.g., "task")
- Attribute paths (e.g., "task.id" to get task.id)
- Dictionaries to include directly in context
**fixed_context: Additional fixed context values
Returns:
Decorated function with automatic context logging
"""
# Process param_specs to separate dictionaries from string specifications
processed_param_specs = []
additional_context = {}
for spec in param_specs:
if isinstance(spec, dict):
# If a dictionary is passed as a positional argument,
# add it to additional context
additional_context.update(spec)
else:
# Otherwise, treat it as a parameter name or attribute path
processed_param_specs.append(spec)
# Merge fixed_context with any dictionaries from param_specs
merged_fixed_context = dict(additional_context)
merged_fixed_context.update(fixed_context)
def decorator(func):
# Check if the function is async or sync
is_async = asyncio.iscoroutinefunction(func)
def get_context_from_args(bound_args):
# Build context dictionary from parameters and fixed values
context = dict(merged_fixed_context)
# Process parameter specifications
for spec in processed_param_specs:
if "." in spec:
# Handle object attribute access (e.g., "task.id")
obj_name, attr_name = spec.split(".", 1)
if obj_name not in bound_args.arguments:
continue
obj = bound_args.arguments[obj_name]
if obj is None:
continue
# Try to get the attribute, gracefully handle missing attributes
try:
# Use the attribute name as the context key
if attr_name == "oid":
modified_attr_name = "id"
else:
modified_attr_name = attr_name
context[f"{obj_name}_{modified_attr_name}"] = getattr(
obj, attr_name
)
except AttributeError:
logger.warning(f"Object {obj_name} has no attribute {attr_name}")
else:
# Handle direct parameter access
if spec in bound_args.arguments:
param_value = bound_args.arguments[spec]
if isinstance(param_value, dict):
# If parameter is a dictionary, merge it into context
context.update(param_value)
else:
context[spec] = param_value
return context
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
# Get parameter values from function call
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# Get context from arguments
context = get_context_from_args(bound_args)
# Execute async function with context
with logger.contextualize(**context):
result = await func(*args, **kwargs)
return result
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Get parameter values from function call
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# Get context from arguments
context = get_context_from_args(bound_args)
# Execute sync function with context
with logger.contextualize(**context):
result = func(*args, **kwargs)
return result
# Return the appropriate wrapper based on the function type
return async_wrapper if is_async else sync_wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment