Last active
November 30, 2021 18:13
-
-
Save roguh/e914fd540061ff74b6a703c13113822c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import fileinput | |
import json | |
import re | |
import sys | |
from argparse import ArgumentParser | |
from dataclasses import dataclass | |
from pprint import pprint | |
from typing import Any, Dict, List, Optional, Union | |
DEFAULT_JSON_INDENT = 4 | |
FORMAT = "{asctime} {severity} {name} {module}:{funcName}:{lineno} {message}" | |
REQUIRED_KEYS = "{asctime} {severity} {name} {module}:{funcName}:{lineno} {message}" | |
# Stack traces or exception tracebacks | |
KEYS_WITH_STACK_TRACES = [ | |
"exc_info", | |
"stack_info", | |
] | |
KNOWN_KEYS = [ | |
"name", | |
"asctime", | |
"severity", | |
"module", | |
"funcName", | |
"lineno", | |
"message", | |
] + KEYS_WITH_STACK_TRACES | |
DEFAULT_IGNORED_KEYS = [ | |
"filename", | |
"pathname", | |
] | |
StrOrBytes = Union[str, bytes] | |
ParsedLog = Dict[str, Any] | |
@dataclass | |
class Arguments: | |
files: List[str] | |
fail_if_non_json: bool | |
missing_keys_warnings: bool | |
ignored_keys: List[str] | |
show_stack_traces: bool | |
show_extra_keys: bool | |
show_message: bool | |
show_non_json: bool | |
strip_stack_traces: bool | |
replace_stack_trace_newlines: Optional[str] | |
extra_keys_sep: str | |
json_indent: Optional[int] | |
def dataclass_to_dict(dataclass_) -> Dict[str, Any]: | |
return { | |
key: getattr(dataclass_, key) for key in dataclass_.__dataclass_fields__.keys() | |
} | |
def parse_plain_json(line): | |
try: | |
return json.loads(line) | |
except json.decoder.JSONDecodeError: | |
return None | |
def parse_loki_json(line): | |
try: | |
return parse_plain_json(line.split("\t", 1)[1]) | |
except IndexError: | |
return None | |
JSON_PARSERS = [parse_plain_json, parse_loki_json] | |
def parse( | |
line: StrOrBytes, fail_if_non_json: bool = False | |
) -> Union[ParsedLog, StrOrBytes]: | |
for parser in JSON_PARSERS: | |
log_msg = parser(line) | |
if log_msg is not None: | |
return log_msg | |
if fail_if_non_json: | |
print( | |
"FATAL: could not parse JSON with parsers", | |
", ".join(func.__name__ for func in JSON_PARSERS), | |
) | |
print(line[:-1]) | |
sys.exit(1) | |
return line | |
def format_extra_keys(log_msg: ParsedLog, args: Arguments) -> str: | |
extra_items = { | |
key: value | |
for key, value in log_msg.items() | |
if not (key in KNOWN_KEYS or key in args.ignored_keys) | |
} | |
output = "" | |
if args.show_extra_keys and len(extra_items) > 0: | |
# Add newlines between msg components | |
if args.show_message: | |
output += args.extra_keys_sep | |
output += json.dumps(extra_items, indent=args.json_indent) | |
return output | |
def strip_stack_trace(stack_trace: str) -> str: | |
lines = stack_trace.split("\n") | |
last_file_location_index = len(lines) - 1 | |
for index, line in enumerate(lines): | |
if re.match('\\s*File ".*", line .*', line): | |
last_file_location_index = index | |
return "\n".join(lines[:2] + lines[last_file_location_index:]) | |
def format_stack_traces( | |
log_msg: ParsedLog, | |
newline_at_start: bool, | |
strip: bool, | |
replace_newlines: Optional[str] = None, | |
) -> str: | |
output = "" | |
for i, field in enumerate(KEYS_WITH_STACK_TRACES): | |
if field in log_msg: | |
if log_msg[field] == "NoneType: None": | |
continue | |
# Add newlines between msg components | |
if i == 0 and not newline_at_start: | |
exc_sep = "" | |
else: | |
exc_sep = replace_newlines if replace_newlines is not None else "\n" | |
stack_trace = log_msg[field] | |
if strip: | |
stack_trace = strip_stack_trace(stack_trace) | |
if replace_newlines: | |
stack_trace = stack_trace.replace("\n", replace_newlines) | |
output += exc_sep + stack_trace | |
return output | |
def format_log_msg( | |
log_msg: Union[ParsedLog, StrOrBytes], original_line: StrOrBytes, args: Arguments | |
) -> Optional[StrOrBytes]: | |
try: | |
if not isinstance(log_msg, dict): | |
raise ValueError(f"expected dict, received {type(log_msg)}") | |
msg = "" | |
if args.show_message: | |
msg += FORMAT.format(**{"name": "|"} | log_msg) | |
msg += format_extra_keys(log_msg, args) | |
if args.show_stack_traces: | |
msg += format_stack_traces( | |
log_msg, | |
newline_at_start=args.show_extra_keys or args.show_message, | |
strip=args.strip_stack_traces, | |
replace_newlines=args.replace_stack_trace_newlines, | |
) | |
if len(msg) == 0: | |
return None | |
return msg | |
except (ValueError, KeyError) as exc: | |
if isinstance(exc, KeyError) and args.missing_keys_warnings: | |
print("WARNING: missing keys", file=sys.stderr) | |
if args.show_non_json: | |
# Exclude trailing newline | |
return original_line[:-1] | |
def parse_args() -> Arguments: | |
parser = ArgumentParser( | |
description="""Convert JSON logs from stdin or files into readable output. | |
The JSON keys are assumed to come from Python. | |
Stack trace and exception tracebacks will be searched for in the keys: """ | |
+ (", ".join(KEYS_WITH_STACK_TRACES)) | |
) | |
parser.add_argument( | |
"files", help="Which log files to read. Reads from stdin as well.", nargs="*" | |
) | |
parser.add_argument( | |
"--missing-keys-warnings", | |
help="Print a warning to stderr if there are missing required keys in JSON logs.", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--no-extra-keys", | |
help="Do not print extra keys in a JSON log line", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--no-message", | |
help="Do not print the main message. Only the stack traces or extra keys", | |
action="store_true", | |
) | |
non_json_group = parser.add_mutually_exclusive_group() | |
non_json_group.add_argument( | |
"--no-non-json", | |
help="Hide non-JSON input instead of printing it as is", | |
action="store_true", | |
) | |
non_json_group.add_argument( | |
"--only-non-json", | |
help="Only show non-JSON input", | |
action="store_true", | |
) | |
non_json_group.add_argument( | |
"--fail-if-non-json", | |
help="Exit immediately if some lines cannot be parsed as JSON.", | |
action="store_true", | |
) | |
stack_trace_group = parser.add_mutually_exclusive_group() | |
stack_trace_group.add_argument( | |
"--no-stack-traces", | |
help="Do not print Python stack traces or exception tracebacks.", | |
action="store_true", | |
) | |
stack_trace_group.add_argument( | |
"--only-stack-traces", | |
help="Only print JSON log lines that contain Python stack traces or exception tracebacks.", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--strip-stack-traces", | |
help="Print only the first two lines and last few lines, including the last file position, of Python stack traces or exception tracebacks.", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--replace-stack-trace-newlines", | |
help="Print Python stack traces or exception tracebacks, but replace newlines with the argument.", | |
action="store", | |
default=None, | |
) | |
parser.add_argument( | |
"--extra-keys-on-same-line", | |
help="Print extra keys on the same line as the formatted log message", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--stack-traces-one-line", | |
help="Print the stack traces and exception tracebacks on a single line. Same as \"--replace-stack-trace-newlines ' '\"", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--one-line", | |
help="Print the formatted log message on a single line. Excludes stack traces and exception tracebacks.", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--ignore", | |
help="Specify a key which must be ignored. Can be given multiple times", | |
action="append", | |
default=None, | |
) | |
# TODO | |
# parser.add_argument("--message-format") | |
# parser.add_argument("--known-keys") | |
json_formatting = parser.add_mutually_exclusive_group() | |
json_formatting.add_argument( | |
"--json-indent", | |
help="How much to indent extra keys when pretty-printing them as JSON.", | |
default=DEFAULT_JSON_INDENT, | |
) | |
json_formatting.add_argument( | |
"--compact-json", | |
help="Print extra keys in a compact format", | |
action="store_true", | |
) | |
parser.add_argument("--print-arguments", action="store_true") | |
args = parser.parse_args() | |
# convenience for --no-extra --no-message | |
if args.only_stack_traces: | |
args.no_extra_keys = True | |
args.no_message = True | |
if args.stack_traces_one_line: | |
args.replace_stack_trace_newlines = " " | |
application_args = Arguments( | |
files=args.files, | |
missing_keys_warnings=args.missing_keys_warnings, | |
show_stack_traces=not args.no_stack_traces and not args.only_non_json, | |
show_extra_keys=not args.no_extra_keys and not args.only_non_json, | |
show_message=not args.no_message and not args.only_non_json, | |
show_non_json=args.only_non_json or not args.no_non_json, | |
fail_if_non_json=args.fail_if_non_json, | |
strip_stack_traces=args.strip_stack_traces, | |
replace_stack_trace_newlines=args.replace_stack_trace_newlines, | |
extra_keys_sep=" " if args.one_line or args.extra_keys_on_same_line else "\n", | |
json_indent=None if args.one_line or args.compact_json else args.json_indent, | |
ignored_keys=DEFAULT_IGNORED_KEYS if args.ignore is None else args.ignore, | |
) | |
if args.print_arguments: | |
print("Arguments:", file=sys.stderr) | |
pprint(dataclass_to_dict(application_args), stream=sys.stderr) | |
return application_args | |
def main(): | |
args = parse_args() | |
with fileinput.input(files=args.files, mode="r") as fileinputinput: | |
try: | |
for line in fileinputinput: | |
parsed_json = parse(line, args.fail_if_non_json) | |
readable_message = format_log_msg(parsed_json, line, args) | |
if readable_message is not None: | |
print(readable_message) | |
except KeyboardInterrupt: | |
return | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment