Skip to content

Instantly share code, notes, and snippets.

@fcavalcantirj
Created February 6, 2026 19:45
Show Gist options
  • Select an option

  • Save fcavalcantirj/d79b904f07b86df15c1a7da2c5d47c2a to your computer and use it in GitHub Desktop.

Select an option

Save fcavalcantirj/d79b904f07b86df15c1a7da2c5d47c2a to your computer and use it in GitHub Desktop.
Fix OpenClaw session corruption: unexpected tool_use_id found in tool_result blocks
#!/usr/bin/env python3
"""
fix-openclaw-session.py — Repair corrupted OpenClaw session transcripts
PROBLEM
-------
When an OpenClaw assistant response is terminated mid-stream while streaming a
tool_use block, the JSONL transcript ends up with a malformed entry: the
tool_call block contains a `partialJson` field but no valid `arguments`, and
the `stopReason` is "error"/"terminated".
OpenClaw's built-in transcript repair (`session-transcript-repair.ts`) notices
the orphaned tool_use_id and inserts a synthetic toolResult for it. However,
because the original tool_use block is *structurally malformed* (partial JSON,
no proper arguments), the Anthropic API does not recognize it as a valid
tool_use. When the full transcript is sent on the next turn, the API rejects it:
messages.N.content.M: unexpected `tool_use_id` found in `tool_result`
blocks: toolu_XXXX. Each `tool_result` block must have a corresponding
`tool_use` block in the previous message.
This is a permanent error — every subsequent message to the session fails with
the same 400, because the corrupted lines are baked into the transcript.
WHAT THIS SCRIPT DOES
---------------------
1. Scans all .jsonl session files (or a specific one) for the corruption pattern
2. Identifies:
a) Assistant messages with stopReason "error" + "terminated" that contain
partialJson tool_call blocks (the root cause)
b) Synthetic toolResult messages that reference tool_use_ids only found in
those broken assistant messages (the orphan repairs)
c) Empty assistant error responses caused by the cascade (content: [])
3. Removes the corrupted lines
4. Fixes parentId references in the DAG to maintain transcript integrity
5. Creates a timestamped backup before any modifications
USAGE
-----
# Dry-run scan of all sessions (shows what would be fixed, changes nothing)
python3 fix-openclaw-session.py
# Dry-run scan of a specific session file
python3 fix-openclaw-session.py /path/to/session.jsonl
# Actually apply the fix
python3 fix-openclaw-session.py --fix
# Fix a specific file
python3 fix-openclaw-session.py --fix /path/to/session.jsonl
"""
import json
import glob
import shutil
import sys
import os
from datetime import datetime
from pathlib import Path
from collections import defaultdict
# ── Defaults ──────────────────────────────────────────────────────────────────
DEFAULT_SESSIONS_DIR = os.path.expanduser(
"~/.openclaw/agents/main/sessions"
)
TOOL_CALL_TYPES = {"toolCall", "toolUse", "functionCall"}
# ── Helpers ───────────────────────────────────────────────────────────────────
def parse_jsonl(filepath: str) -> list[tuple[int, dict | None, str]]:
"""Parse a JSONL file. Returns list of (line_number, parsed_obj, raw_line)."""
entries = []
with open(filepath, "r", encoding="utf-8") as f:
for i, raw in enumerate(f, start=1):
raw = raw.rstrip("\n")
if not raw:
continue
try:
obj = json.loads(raw)
except json.JSONDecodeError:
obj = None
entries.append((i, obj, raw))
return entries
def get_role(obj: dict) -> str | None:
msg = obj.get("message")
if isinstance(msg, dict):
return msg.get("role")
return None
def get_id(obj: dict) -> str:
return obj.get("id", "")
def get_parent_id(obj: dict) -> str:
return obj.get("parentId", "")
def extract_tool_call_ids(obj: dict) -> set[str]:
"""Extract all tool_call IDs from an assistant message's content blocks."""
msg = obj.get("message", {})
content = msg.get("content", [])
if not isinstance(content, list):
return set()
ids = set()
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") in TOOL_CALL_TYPES:
block_id = block.get("id", "")
if block_id:
ids.add(block_id)
return ids
def has_partial_json(obj: dict) -> bool:
"""Check if any tool_call block in the assistant message has partialJson."""
msg = obj.get("message", {})
content = msg.get("content", [])
if not isinstance(content, list):
return False
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") in TOOL_CALL_TYPES and "partialJson" in block:
return True
return False
def is_terminated_error(obj: dict) -> bool:
"""Check if this assistant message was terminated mid-stream."""
msg = obj.get("message", {})
stop = msg.get("stopReason", "")
err = msg.get("errorMessage", "")
return stop == "error" and "terminated" in err.lower()
def is_empty_error_assistant(obj: dict) -> bool:
"""Check if this is an empty assistant response that recorded an API error."""
msg = obj.get("message", {})
if msg.get("role") != "assistant":
return False
content = msg.get("content", [])
err = msg.get("errorMessage", "")
return (content == [] or content is None) and "tool_use_id" in err
def get_tool_result_id(obj: dict) -> str | None:
"""Extract the toolCallId from a toolResult message."""
msg = obj.get("message", {})
if msg.get("role") != "toolResult":
return None
return msg.get("toolCallId") or msg.get("toolUseId") or None
# ── Core Analysis ─────────────────────────────────────────────────────────────
def analyze_session(filepath: str) -> dict:
"""
Analyze a session file for the terminated-partial-toolcall corruption.
Returns a report dict.
"""
entries = parse_jsonl(filepath)
if not entries:
return {"file": filepath, "lines": 0, "corrupted": False}
# Pass 1: Find broken assistant messages (terminated + partialJson)
broken_assistant_ids = set() # IDs of broken assistant messages
broken_tool_call_ids = set() # tool_call IDs inside those broken messages
broken_line_map = {} # id -> line number
for lineno, obj, _ in entries:
if obj is None:
continue
role = get_role(obj)
if role != "assistant":
continue
if is_terminated_error(obj) and has_partial_json(obj):
oid = get_id(obj)
broken_assistant_ids.add(oid)
broken_line_map[oid] = lineno
broken_tool_call_ids |= extract_tool_call_ids(obj)
if not broken_assistant_ids:
return {
"file": filepath,
"lines": len(entries),
"corrupted": False,
}
# Pass 2: Find orphan synthetic toolResults referencing broken tool_call IDs
orphan_result_ids = set()
for lineno, obj, _ in entries:
if obj is None:
continue
tr_id = get_tool_result_id(obj)
if tr_id and tr_id in broken_tool_call_ids:
oid = get_id(obj)
orphan_result_ids.add(oid)
broken_line_map[oid] = lineno
# Pass 3: Find empty error assistant responses caused by the cascade
cascade_error_ids = set()
for lineno, obj, _ in entries:
if obj is None:
continue
if is_empty_error_assistant(obj):
err = obj.get("message", {}).get("errorMessage", "")
# Check if the error references any of our broken tool_call IDs
for tc_id in broken_tool_call_ids:
if tc_id in err:
oid = get_id(obj)
cascade_error_ids.add(oid)
broken_line_map[oid] = lineno
break
remove_ids = broken_assistant_ids | orphan_result_ids | cascade_error_ids
# Build parent remap: for each removed node, find what should replace it
# in the parentId chain
id_to_parent = {}
for _, obj, _ in entries:
if obj is None:
continue
id_to_parent[get_id(obj)] = get_parent_id(obj)
parent_fixes = {}
for rid in remove_ids:
# Walk up until we find a non-removed ancestor
ancestor = id_to_parent.get(rid, "")
while ancestor in remove_ids:
ancestor = id_to_parent.get(ancestor, "")
parent_fixes[rid] = ancestor
return {
"file": filepath,
"lines": len(entries),
"corrupted": True,
"broken_assistants": sorted(broken_assistant_ids),
"orphan_results": sorted(orphan_result_ids),
"cascade_errors": sorted(cascade_error_ids),
"remove_ids": remove_ids,
"remove_count": len(remove_ids),
"parent_fixes": parent_fixes,
"line_map": broken_line_map,
"broken_tool_call_ids": sorted(broken_tool_call_ids),
}
# ── Fix ───────────────────────────────────────────────────────────────────────
def fix_session(filepath: str, report: dict, dry_run: bool = True) -> str:
"""Apply the fix to a session file. Returns summary string."""
if not report.get("corrupted"):
return f" {filepath}: clean, nothing to do."
remove_ids = report["remove_ids"]
parent_fixes = report["parent_fixes"]
line_map = report["line_map"]
lines_info = ", ".join(
f"L{line_map[rid]}" for rid in sorted(line_map, key=lambda x: line_map[x])
)
if dry_run:
summary = [
f" {filepath}:",
f" Total lines: {report['lines']}",
f" Broken assistant messages: {len(report['broken_assistants'])}",
f" Orphan synthetic toolResults: {len(report['orphan_results'])}",
f" Cascade error responses: {len(report['cascade_errors'])}",
f" Lines to remove ({report['remove_count']}): {lines_info}",
f" Poisoned tool_call IDs: {', '.join(report['broken_tool_call_ids'])}",
f" Parent chain fixes: {len(parent_fixes)}",
]
return "\n".join(summary)
# Create backup
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
backup = f"{filepath}.backup-{ts}"
shutil.copy2(filepath, backup)
# Read, filter, fix, write
entries = parse_jsonl(filepath)
output = []
removed = 0
fixed = 0
for _, obj, raw in entries:
if obj is None:
output.append(raw)
continue
oid = get_id(obj)
# Skip lines marked for removal
if oid in remove_ids:
removed += 1
continue
# Fix parent references
pid = get_parent_id(obj)
if pid in parent_fixes:
obj["parentId"] = parent_fixes[pid]
fixed += 1
output.append(json.dumps(obj, ensure_ascii=False))
else:
output.append(raw)
with open(filepath, "w", encoding="utf-8") as f:
f.write("\n".join(output) + "\n")
summary = [
f" {filepath}:",
f" Backup: {backup}",
f" Removed {removed} corrupted lines: {lines_info}",
f" Fixed {fixed} parent references",
f" Lines: {report['lines']} -> {len(output)}",
]
return "\n".join(summary)
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
args = sys.argv[1:]
do_fix = "--fix" in args
args = [a for a in args if a != "--fix"]
# Determine which files to scan
if args:
files = [args[0]]
else:
pattern = os.path.join(DEFAULT_SESSIONS_DIR, "*.jsonl")
files = sorted(glob.glob(pattern))
if not files:
print("No session files found.")
return
mode = "FIX" if do_fix else "DRY-RUN (use --fix to apply)"
print(f"OpenClaw Session Repair — {mode}")
print(f"Scanning {len(files)} session file(s)...\n")
corrupted_count = 0
for filepath in files:
report = analyze_session(filepath)
if report.get("corrupted"):
corrupted_count += 1
result = fix_session(filepath, report, dry_run=not do_fix)
print(result)
print()
if corrupted_count == 0:
print("All sessions are clean. No corruption found.")
else:
print(f"{'Fixed' if do_fix else 'Found'} {corrupted_count} corrupted session(s).")
if not do_fix:
print("\nRun with --fix to apply repairs.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment