Skip to content

Instantly share code, notes, and snippets.

@courville
Created October 20, 2025 12:04
Show Gist options
  • Select an option

  • Save courville/43919b2072f5055d30bec39289e96a3d to your computer and use it in GitHub Desktop.

Select an option

Save courville/43919b2072f5055d30bec39289e96a3d to your computer and use it in GitHub Desktop.
rewriting log.(debug|trace|warn|error) removing "+" to get "{}" parameters instead to improve performance
#!/usr/bin/env python3
"""
Interactive refactoring tool to replace SLF4J log string concatenations with
parameterized placeholders.
Usage:
python rewrite-logs-interactive.py path/to/File.java [...]
For each candidate line it shows a unified diff and prompts:
y - apply this change
n - skip this change
a - apply this and all remaining changes without prompting
q - quit immediately (already-applied changes are kept)
"""
import argparse
import ast
import difflib
import re
import sys
def split_first_argument(body):
"""Return (first_argument, remainder_without_leading_comma)."""
depth = 0
in_string = False
escape = False
for idx, ch in enumerate(body):
if in_string:
if escape:
escape = False
elif ch == "\\": # escape next char
escape = True
elif ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
elif ch in "([{":
depth += 1
elif ch in ")]}":
depth = max(depth - 1, 0)
elif ch == "," and depth == 0:
return body[:idx], body[idx + 1 :]
return body, ""
def split_plus(expr):
"""Split an expression by + operators that are outside strings and brackets."""
parts = []
current = []
in_string = False
escape = False
depth = 0
for ch in expr:
if in_string:
current.append(ch)
if escape:
escape = False
elif ch == "\\":
escape = True
elif ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
current.append(ch)
elif ch in "([{":
depth += 1
current.append(ch)
elif ch in ")]}":
depth = max(depth - 1, 0)
current.append(ch)
elif ch == "+" and depth == 0:
parts.append("".join(current))
current = []
else:
current.append(ch)
parts.append("".join(current))
return parts
def decode_string(token):
"""Return decoded string literal or None."""
try:
value = ast.literal_eval(token)
except Exception:
return None
return value if isinstance(value, str) else None
def transform_line(line):
line = line.rstrip("\n")
if not re.search(r'log\.(debug|trace|warn|error)\(', line):
return None
if not re.search(r'"\s*\+', line):
return None
# locate the logging call
for level in ("debug", "trace", "warn", "error"):
marker = f"log.{level}"
idx = line.find(marker)
if idx != -1:
break
else:
return None
call_start = line.find("(", idx)
if call_start == -1:
return None
head = line[: call_start + 1]
body_tail = line[call_start + 1 :]
end = body_tail.find(");")
if end == -1:
return None
body = body_tail[:end]
tail = body_tail[end:]
first_arg, rest = split_first_argument(body)
stripped = first_arg.strip()
if not re.search(r'"\s*\+', stripped):
return None
tokens = split_plus(stripped)
format_parts = []
args = []
has_string = False
for raw in tokens:
token = raw.strip()
if not token:
continue
decoded = decode_string(token)
if decoded is not None:
has_string = True
format_parts.append(decoded)
else:
format_parts.append("{}")
args.append(token)
if not has_string or not args:
return None
format_str = "".join(format_parts)
escaped = format_str.replace("\\", "\\\\").replace('"', '\\"')
# restore leading/trailing whitespace around first argument
leading_len = len(first_arg) - len(first_arg.lstrip())
trailing_len = len(first_arg) - len(first_arg.rstrip())
leading = first_arg[:leading_len]
trailing = first_arg[len(first_arg) - trailing_len :] if trailing_len else ""
new_core = f"\"{escaped}\""
if args:
new_core += ", " + ", ".join(args)
new_first_arg = leading + new_core + trailing
new_body = new_first_arg
if rest:
new_body += "," + rest
new_line = head + new_body + tail
return new_line if new_line != line else None
def collect_log_block(lines, start_index):
line = lines[start_index]
if "log." not in line:
return None, start_index
block = [line]
paren_balance = line.count("(") - line.count(")")
if ");" in line and paren_balance <= 0:
return block, start_index
idx = start_index + 1
while idx < len(lines):
part = lines[idx]
block.append(part)
paren_balance += part.count("(") - part.count(")")
if ");" in part and paren_balance <= 0:
return block, idx
idx += 1
return block, len(lines) - 1
def prompt_user(diff_lines):
for line in diff_lines:
sys.stdout.write(line)
while True:
choice = input("Apply change? [y]es/[n]o/[a]ll/[q]uit: ").strip().lower()
if choice in {"y", "n", "a", "q"}:
return choice
print("Please respond with y, n, a, or q.")
def process_file(path, apply_all):
with open(path, "r", encoding="utf-8") as fh:
lines = fh.readlines()
changed = False
idx = 0
while idx < len(lines):
block, end_idx = collect_log_block(lines, idx)
if block is None:
idx += 1
continue
flat_block = "".join(part.rstrip("\n") for part in block)
new_line = transform_line(flat_block)
if new_line is None:
idx = end_idx + 1
continue
new_line_with_nl = new_line + "\n"
diff = difflib.unified_diff(
block,
[new_line_with_nl],
fromfile=f"{path} (original)",
tofile=f"{path} (modified)",
lineterm="",
)
diff_lines = [d + "\n" for d in diff]
if apply_all:
for line_out in diff_lines:
sys.stdout.write(line_out)
sys.stdout.write("Applied automatically (apply-all mode).\n")
lines[idx : end_idx + 1] = [new_line_with_nl]
changed = True
idx += 1
continue
choice = prompt_user(diff_lines)
if choice == "y":
lines[idx : end_idx + 1] = [new_line_with_nl]
changed = True
idx += 1
elif choice == "a":
lines[idx : end_idx + 1] = [new_line_with_nl]
changed = True
apply_all = True
idx += 1
elif choice == "n":
idx = end_idx + 1
elif choice == "q":
break
if changed:
with open(path, "w", encoding="utf-8") as fh:
fh.writelines(lines)
print(f"Updated {path}")
else:
print(f"No changes in {path}")
return apply_all
def main():
parser = argparse.ArgumentParser(
description="Interactively convert SLF4J concatenations to parameterized logs."
)
parser.add_argument("files", nargs="+", help="Java source files to process")
args = parser.parse_args()
apply_all = False
for file_path in args.files:
apply_all = process_file(file_path, apply_all)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nAborted by user.")
#!/usr/bin/env bash
set -euo pipefail
PATTERN='\s*log\.(debug|trace|warn|error).*"[[:space:]]*\+.*'
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PY_SCRIPT="${SCRIPT_DIR}/rewrite-logs-interactive.py"
if [[ ! -x "${PY_SCRIPT}" ]]; then
echo "Cannot execute ${PY_SCRIPT}" >&2
exit 1
fi
if ! command -v ag >/dev/null 2>&1; then
echo "ripgrep (rg) is required but not installed or not in PATH." >&2
exit 1
fi
if [[ $# -eq 0 ]]; then
search_targets=(.)
else
search_targets=("$@")
fi
mapfile -d '' files < <(ag --java -0 -l "${PATTERN}" "${search_targets[@]}")
if [[ ${#files[@]} -eq 0 ]]; then
echo "No files matched pattern ${PATTERN}"
exit 0
fi
python3 "${PY_SCRIPT}" "${files[@]}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment