Last active
March 2, 2026 14:10
-
-
Save mohitmun/c400544df67cfb33c7ffccdaf93da178 to your computer and use it in GitHub Desktop.
Claude Code session parser - parse, list, blame, and summarize Claude JSONL session files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Session Parser for Claude Code | |
| # Parse Claude session JSONL files to review past conversations | |
| # | |
| # Usage: | |
| # python session_parser.py list [-p project] [-n limit] — list recent sessions | |
| # python session_parser.py summary <session_id> — quick summary (user msgs + tool counts) | |
| # python session_parser.py parse <session_id> — full conversation view | |
| # python session_parser.py parse <session_id> -m tools — list all tool calls | |
| # python session_parser.py parse <session_id> -t — include tool calls in conversation | |
| # python session_parser.py blame "code_string" [-p project] — find which session wrote a code string (git blame for Claude) | |
| # | |
| # Supports partial session IDs (e.g. cb3f1904 instead of full UUID) | |
| import json | |
| import sys | |
| import os | |
| import logging | |
| import glob as glob_mod | |
| import argparse | |
| logging.basicConfig(level=logging.INFO, format='%(message)s') | |
| log = logging.getLogger(__name__) | |
| PROJECTS_DIR = os.path.expanduser("~/.claude/projects") | |
| def find_session_file(session_id): | |
| log.info(f"searching for session {session_id}") | |
| pattern = os.path.join(PROJECTS_DIR, "**", f"{session_id}.jsonl") | |
| matches = glob_mod.glob(pattern, recursive=True) | |
| if not matches: | |
| log.info(f"no session file found for {session_id}") | |
| return None | |
| log.info(f"found: {matches[0]}") | |
| return matches[0] | |
| def list_sessions(project_filter=None, limit=20): | |
| log.info(f"listing sessions (limit={limit}, filter={project_filter})") | |
| sessions = [] | |
| for proj_dir in os.listdir(PROJECTS_DIR): | |
| if project_filter and project_filter not in proj_dir: | |
| continue | |
| full_dir = os.path.join(PROJECTS_DIR, proj_dir) | |
| if not os.path.isdir(full_dir): | |
| continue | |
| for f in os.listdir(full_dir): | |
| if not f.endswith(".jsonl"): | |
| continue | |
| sid = f.replace(".jsonl", "") | |
| fpath = os.path.join(full_dir, f) | |
| mtime = os.path.getmtime(fpath) | |
| first_msg = get_first_user_message(fpath) | |
| sessions.append({ | |
| "id": sid, | |
| "project": proj_dir, | |
| "mtime": mtime, | |
| "preview": first_msg[:80] if first_msg else "(empty)", | |
| "path": fpath, | |
| }) | |
| sessions.sort(key=lambda x: x["mtime"], reverse=True) | |
| sessions = sessions[:limit] | |
| import datetime | |
| for s in sessions: | |
| dt = datetime.datetime.fromtimestamp(s["mtime"]).strftime("%Y-%m-%d %H:%M") | |
| proj_short = s["project"].split("-")[-1] if s["project"] else "?" | |
| print(f"{dt} {s['id'][:12]}.. [{proj_short}] {s['preview']}") | |
| return sessions | |
| def get_first_user_message(fpath): | |
| with open(fpath, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| row = json.loads(line) | |
| if row.get("type") != "user": | |
| continue | |
| return extract_user_text(row) | |
| return "" | |
| def is_system_noise(text): | |
| if not text: | |
| return True | |
| noise = ["<task-notification>", "<system-reminder>", "<task-id>"] | |
| for n in noise: | |
| if text.strip().startswith(n): | |
| return True | |
| return False | |
| def extract_user_text(row): | |
| msg = row.get("message", {}) | |
| content = msg.get("content", "") | |
| if isinstance(content, str): | |
| text = content.strip() | |
| return "" if is_system_noise(text) else text | |
| if isinstance(content, list): | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| if item.get("type") == "text": | |
| t = item.get("text", "").strip() | |
| if not is_system_noise(t): | |
| parts.append(t) | |
| elif item.get("type") == "tool_result": | |
| continue | |
| elif isinstance(item, str): | |
| if not is_system_noise(item): | |
| parts.append(item) | |
| return " ".join(parts).strip() | |
| return str(content).strip() | |
| def extract_assistant_text(row): | |
| msg = row.get("message", {}) | |
| content = msg.get("content", []) | |
| if isinstance(content, str): | |
| return content.strip() | |
| if isinstance(content, list): | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| if item.get("type") == "text": | |
| t = item.get("text", "").strip() | |
| if t: | |
| parts.append(t) | |
| elif item.get("type") == "tool_use": | |
| tool = item.get("name", "?") | |
| inp = item.get("input", {}) | |
| desc = inp.get("description", inp.get("command", inp.get("pattern", ""))) | |
| if isinstance(desc, str) and len(desc) > 100: | |
| desc = desc[:100] + "..." | |
| parts.append(f"[tool: {tool} -> {desc}]") | |
| return "\n".join(parts) | |
| return "" | |
| def extract_tool_result_preview(row): | |
| msg = row.get("message", {}) | |
| content = msg.get("content", []) | |
| if not isinstance(content, list): | |
| return "" | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "tool_result": | |
| result_content = item.get("content", "") | |
| if isinstance(result_content, str): | |
| lines = result_content.strip().split("\n") | |
| if len(lines) > 5: | |
| return "\n".join(lines[:5]) + f"\n... ({len(lines)} lines total)" | |
| return result_content.strip() | |
| return "" | |
| def parse_session(session_id, mode="conversation", show_tools=False): | |
| fpath = find_session_file(session_id) | |
| if not fpath: | |
| return | |
| log.info(f"parsing session: {mode} mode, show_tools={show_tools}") | |
| rows = [] | |
| with open(fpath, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| rows.append(json.loads(line)) | |
| log.info(f"total lines: {len(rows)}") | |
| if mode == "messages": | |
| print_messages(rows, show_tools) | |
| elif mode == "summary": | |
| print_summary(rows) | |
| elif mode == "tools": | |
| print_tools(rows) | |
| elif mode == "conversation": | |
| print_conversation(rows, show_tools) | |
| def print_messages(rows, show_tools): | |
| seen_msgs = set() | |
| for row in rows: | |
| rtype = row.get("type") | |
| if rtype == "user": | |
| text = extract_user_text(row) | |
| if text and text not in seen_msgs: | |
| seen_msgs.add(text) | |
| ts = format_ts(row.get("timestamp", "")) | |
| print(f"\n{'='*60}") | |
| print(f"USER [{ts}]: {text}") | |
| elif rtype == "assistant": | |
| text = extract_assistant_text(row) | |
| msg_id = row.get("message", {}).get("id", "") | |
| key = f"{msg_id}:{text[:50]}" | |
| if text and key not in seen_msgs: | |
| seen_msgs.add(key) | |
| if "[tool:" in text and not show_tools: | |
| continue | |
| print(f"CLAUDE: {text}") | |
| def print_conversation(rows, show_tools): | |
| seen_user = set() | |
| seen_asst = set() | |
| last_type = None | |
| for row in rows: | |
| rtype = row.get("type") | |
| if rtype == "user": | |
| text = extract_user_text(row) | |
| tool_preview = extract_tool_result_preview(row) | |
| if tool_preview and not text: | |
| if show_tools: | |
| key = tool_preview[:80] | |
| if key not in seen_asst: | |
| seen_asst.add(key) | |
| print(f" [result]: {tool_preview[:200]}") | |
| continue | |
| if not text or text in seen_user: | |
| continue | |
| seen_user.add(text) | |
| ts = format_ts(row.get("timestamp", "")) | |
| print(f"\n{'='*60}") | |
| print(f"YOU [{ts}]:") | |
| print(f" {text}") | |
| print(f"{'='*60}") | |
| last_type = "user" | |
| elif rtype == "assistant": | |
| text = extract_assistant_text(row) | |
| if not text: | |
| continue | |
| msg_id = row.get("message", {}).get("id", "") | |
| key = f"{msg_id}:{text[:80]}" | |
| if key in seen_asst: | |
| continue | |
| seen_asst.add(key) | |
| is_tool = "[tool:" in text | |
| if is_tool and not show_tools: | |
| continue | |
| if last_type != "assistant": | |
| print(f"\nCLAUDE:") | |
| if is_tool: | |
| print(f" {text}") | |
| else: | |
| for line in text.split("\n"): | |
| print(f" {line}") | |
| last_type = "assistant" | |
| def print_summary(rows): | |
| user_msgs = [] | |
| tool_counts = {} | |
| seen_user = set() | |
| for row in rows: | |
| rtype = row.get("type") | |
| if rtype == "user": | |
| text = extract_user_text(row) | |
| if text and text not in seen_user: | |
| seen_user.add(text) | |
| ts = format_ts(row.get("timestamp", "")) | |
| user_msgs.append({"ts": ts, "text": text}) | |
| elif rtype == "assistant": | |
| msg = row.get("message", {}) | |
| content = msg.get("content", []) | |
| if isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "tool_use": | |
| name = item.get("name", "?") | |
| tool_counts[name] = tool_counts.get(name, 0) + 1 | |
| ts_first = "?" | |
| ts_last = "?" | |
| for r in rows: | |
| if r.get("timestamp"): | |
| ts_first = r["timestamp"] | |
| break | |
| for r in reversed(rows): | |
| if r.get("timestamp"): | |
| ts_last = r["timestamp"] | |
| break | |
| print(f"Session: {format_ts(ts_first)} -> {format_ts(ts_last)}") | |
| print(f"User messages: {len(user_msgs)}") | |
| print(f"Total lines: {len(rows)}") | |
| print() | |
| print("--- User Messages ---") | |
| for i, m in enumerate(user_msgs, 1): | |
| preview = m["text"][:120] | |
| print(f" {i}. [{m['ts']}] {preview}") | |
| print() | |
| if tool_counts: | |
| print("--- Tool Usage ---") | |
| for tool, count in sorted(tool_counts.items(), key=lambda x: -x[1]): | |
| print(f" {tool}: {count}") | |
| def print_tools(rows): | |
| seen = set() | |
| for row in rows: | |
| if row.get("type") != "assistant": | |
| continue | |
| msg = row.get("message", {}) | |
| content = msg.get("content", []) | |
| if not isinstance(content, list): | |
| continue | |
| for item in content: | |
| if not isinstance(item, dict) or item.get("type") != "tool_use": | |
| continue | |
| tool_id = item.get("id", "") | |
| if tool_id in seen: | |
| continue | |
| seen.add(tool_id) | |
| name = item.get("name", "?") | |
| inp = item.get("input", {}) | |
| desc = inp.get("description", inp.get("command", inp.get("pattern", inp.get("file_path", "")))) | |
| if isinstance(desc, str) and len(desc) > 120: | |
| desc = desc[:120] + "..." | |
| print(f" {name}: {desc}") | |
| def blame(search_str, project_filter=None): | |
| log.info(f"blaming: '{search_str}' (filter={project_filter})") | |
| import subprocess | |
| results = [] | |
| for proj_dir in os.listdir(PROJECTS_DIR): | |
| if project_filter and project_filter not in proj_dir: | |
| continue | |
| full_dir = os.path.join(PROJECTS_DIR, proj_dir) | |
| if not os.path.isdir(full_dir): | |
| continue | |
| proc = subprocess.run( | |
| ["grep", "-rl", search_str, full_dir, "--include=*.jsonl"], | |
| capture_output=True, text=True, timeout=30 | |
| ) | |
| if proc.returncode != 0: | |
| continue | |
| for fpath in proc.stdout.strip().split("\n"): | |
| if not fpath: | |
| continue | |
| sid = os.path.basename(fpath).replace(".jsonl", "") | |
| matches = scan_session_for_writes(fpath, search_str) | |
| for m in matches: | |
| m["session_id"] = sid | |
| m["project"] = proj_dir | |
| results.append(m) | |
| results.sort(key=lambda x: x.get("timestamp", "")) | |
| log.info(f"found {len(results)} write matches") | |
| print_blame_results(results, search_str) | |
| return results | |
| def scan_session_for_writes(fpath, search_str): | |
| matches = [] | |
| with open(fpath, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line or search_str not in line: | |
| continue | |
| row = json.loads(line) | |
| if row.get("type") != "assistant": | |
| continue | |
| msg = row.get("message", {}) | |
| content = msg.get("content", []) | |
| if not isinstance(content, list): | |
| continue | |
| for item in content: | |
| if not isinstance(item, dict) or item.get("type") != "tool_use": | |
| continue | |
| name = item.get("name", "") | |
| inp = item.get("input", {}) | |
| if name == "Edit": | |
| ns = inp.get("new_string", "") | |
| if search_str in ns: | |
| matches.append({ | |
| "tool": "Edit", | |
| "file": inp.get("file_path", "?"), | |
| "timestamp": row.get("timestamp", ""), | |
| "old": snippet_around(inp.get("old_string", ""), search_str), | |
| "new": snippet_around(ns, search_str), | |
| }) | |
| elif name == "Write": | |
| c = inp.get("content", "") | |
| if search_str in c: | |
| matches.append({ | |
| "tool": "Write", | |
| "file": inp.get("file_path", "?"), | |
| "timestamp": row.get("timestamp", ""), | |
| "old": None, | |
| "new": snippet_around(c, search_str), | |
| }) | |
| return matches | |
| def snippet_around(text, search_str, ctx=80): | |
| if not text or search_str not in text: | |
| return "" | |
| idx = text.index(search_str) | |
| start = max(0, idx - ctx) | |
| end = min(len(text), idx + len(search_str) + ctx) | |
| snippet = text[start:end] | |
| if start > 0: | |
| snippet = "..." + snippet | |
| if end < len(text): | |
| snippet = snippet + "..." | |
| return snippet | |
| def print_blame_results(results, search_str): | |
| if not results: | |
| print(f"no Edit/Write found for: '{search_str}'") | |
| return | |
| print(f"Found {len(results)} write(s) containing '{search_str}':\n") | |
| for i, r in enumerate(results, 1): | |
| ts = format_ts(r["timestamp"]) | |
| fname = os.path.basename(r["file"]) | |
| sid_short = r["session_id"][:12] | |
| print(f" {i}. [{ts}] {r['tool']} {fname} (session: {sid_short}..)") | |
| if r.get("old"): | |
| old_line = r["old"].replace("\n", "\\n")[:120] | |
| print(f" old: {old_line}") | |
| new_line = r["new"].replace("\n", "\\n")[:120] | |
| print(f" new: {new_line}") | |
| print() | |
| def format_ts(ts): | |
| if not ts: | |
| return "?" | |
| import datetime | |
| ts_str = ts.replace("Z", "+00:00") | |
| dt = datetime.datetime.fromisoformat(ts_str) | |
| ist = dt + datetime.timedelta(hours=5, minutes=30) | |
| return ist.strftime("%H:%M") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Parse Claude session JSONL files") | |
| sub = parser.add_subparsers(dest="cmd") | |
| p_list = sub.add_parser("list", help="List recent sessions") | |
| p_list.add_argument("--project", "-p", help="Filter by project name substring") | |
| p_list.add_argument("--limit", "-n", type=int, default=20) | |
| p_parse = sub.add_parser("parse", help="Parse a session") | |
| p_parse.add_argument("session_id", help="Session UUID (partial ok)") | |
| p_parse.add_argument("--mode", "-m", choices=["conversation", "messages", "summary", "tools"], default="conversation") | |
| p_parse.add_argument("--tools", "-t", action="store_true", help="Show tool calls") | |
| p_summary = sub.add_parser("summary", help="Quick summary of session") | |
| p_summary.add_argument("session_id") | |
| p_blame = sub.add_parser("blame", help="Find which session wrote a code string") | |
| p_blame.add_argument("search_str", help="Code string to search for in Edit/Write calls") | |
| p_blame.add_argument("--project", "-p", help="Filter by project name substring") | |
| args = parser.parse_args() | |
| if args.cmd == "list": | |
| list_sessions(args.project, args.limit) | |
| elif args.cmd == "parse": | |
| full_id = resolve_session_id(args.session_id) | |
| if full_id: | |
| parse_session(full_id, args.mode, args.tools) | |
| elif args.cmd == "summary": | |
| full_id = resolve_session_id(args.session_id) | |
| if full_id: | |
| parse_session(full_id, "summary") | |
| elif args.cmd == "blame": | |
| blame(args.search_str, args.project) | |
| else: | |
| parser.print_help() | |
| def resolve_session_id(partial): | |
| log.info(f"resolving session id: {partial}") | |
| if find_session_file(partial): | |
| return partial | |
| log.info("exact match not found, searching partial...") | |
| pattern = os.path.join(PROJECTS_DIR, "**", f"*{partial}*.jsonl") | |
| matches = glob_mod.glob(pattern, recursive=True) | |
| if not matches: | |
| log.info(f"no session found matching: {partial}") | |
| return None | |
| if len(matches) > 1: | |
| log.info(f"multiple matches ({len(matches)}), using most recent") | |
| matches.sort(key=os.path.getmtime, reverse=True) | |
| sid = os.path.basename(matches[0]).replace(".jsonl", "") | |
| log.info(f"resolved to: {sid}") | |
| return sid | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment