Last active
April 24, 2026 18:08
-
-
Save gbertb/8222bc29ee0c0580ad288a0f5e388ecd to your computer and use it in GitHub Desktop.
claude_sessions.py — Unified CLI for browsing, searching, and exporting Claude Code sessions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| claude_sessions.py (v0.1) — Unified CLI for browsing, searching, and exporting Claude Code sessions. | |
| Combines session listing, full export, and compact (LLM-reviewable) export into one tool. | |
| Designed for both human and AI agent use. | |
| COMMANDS | |
| -------- | |
| list List recent sessions with titles, timestamps, and IDs. | |
| search Search sessions by title or first user message (case-insensitive). | |
| show Print session metadata (title, branch, turn count, timestamps). | |
| export-chat Export a session as compact JSON for LLM review (default — low token cost). | |
| export-full-chat Export a session as full formatted JSON (all events + messages). | |
| inspect Read full untruncated content for specific items from a session. | |
| RECOMMENDED WORKFLOW (for AI agents) | |
| ------------------------------------- | |
| 1. Search for the session by title: | |
| uv run claude_sessions.py search --title-only "lead database" | |
| 2. Export the compact chat (default — optimized for agent consumption): | |
| uv run claude_sessions.py export-chat 63d559bd-01eb-48f9-90c6-c0cad4d20476 | |
| 3. If you need the full unabridged export (large, all raw events): | |
| uv run claude_sessions.py export-full-chat 63d559bd -o session.json | |
| QUICK REFERENCE (for AI agents) | |
| -------------------------------- | |
| # Find a session you worked on: | |
| uv run claude_sessions.py search "lead database" | |
| # See metadata for a session: | |
| uv run claude_sessions.py show 63d559bd-01eb-48f9-90c6-c0cad4d20476 | |
| # Export compact version for review (default use): | |
| uv run claude_sessions.py export-chat 63d559bd-01eb-48f9-90c6-c0cad4d20476 | |
| # List last 7 days of sessions: | |
| uv run claude_sessions.py list --days 7 | |
| # Export full session with all raw events: | |
| uv run claude_sessions.py export-full-chat 63d559bd -o session.json | |
| # Inspect truncated content — drill into specific items: | |
| uv run claude_sessions.py inspect 63d559bd toolu_013CCoPRaqXEPqdXgsPJNQeY | |
| uv run claude_sessions.py inspect 63d559bd turn:5 | |
| uv run claude_sessions.py inspect 63d559bd turn:5 toolu_abc toolu_def | |
| INSPECTING TRUNCATED CONTENT (for AI agents) | |
| --------------------------------------------- | |
| Compact output marks truncated blocks with "…[truncated]" and includes | |
| a "remaining_chars" field showing how many characters were cut off. | |
| To read the full content: | |
| 1. Look for "…[truncated]" in any content field | |
| 2. Grab the ID: | |
| - tool_use_id (e.g., toolu_013CCo...) for tool results | |
| - tool call id (e.g., toolu_013CCo...) for tool call inputs | |
| - turn:N (e.g., turn:5) for assistant/user text blocks | |
| 3. Run: uv run claude_sessions.py inspect <session> <id> [<id2> ...] | |
| Multiple IDs can be passed in one call. Output is JSON to stdout. | |
| SESSION RESOLUTION | |
| ------------------ | |
| Session identifiers can be: | |
| - Full UUID: 63d559bd-01eb-48f9-90c6-c0cad4d20476 | |
| - Partial UUID prefix: 63d559bd (matches first found) | |
| - Full path to .jsonl file | |
| OUTPUT | |
| ------ | |
| list/search/show/inspect → stdout (table or JSON) | |
| export-chat/export-full-chat → JSON file written to disk (default: ./<session_id>.<format>.json) | |
| All commands support --json for machine-readable output to stdout. | |
| ENVIRONMENT | |
| ----------- | |
| CLAUDE_DIR Override ~/.claude base directory | |
| MAX_TOOL_RESULT_CHARS Max chars per tool result in compact export (default: 800) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| # ── Config ────────────────────────────────────────────────────────── | |
| CLAUDE_DIR = Path(os.environ.get("CLAUDE_DIR", Path.home() / ".claude")) | |
| PROJECTS_DIR = CLAUDE_DIR / "projects" | |
| MAX_TOOL_RESULT_CHARS = int(os.environ.get("MAX_TOOL_RESULT_CHARS", "800")) | |
| # Noise patterns to strip from compact export | |
| NOISE_PATTERNS = [ | |
| r"<local-command-caveat>", | |
| r"<local-command-stdout>", | |
| r"<command-name>/model", | |
| r"<command-name>/clear", | |
| r"<command-name>/compact", | |
| r"<command-name>/config", | |
| r"<system-reminder>", | |
| ] | |
| NOISE_RE = re.compile("|".join(NOISE_PATTERNS)) | |
| SYSTEM_REMINDER_RE = re.compile(r"<system-reminder>.*?</system-reminder>", re.DOTALL) | |
| # ── Session discovery ─────────────────────────────────────────────── | |
| def find_all_sessions(days: int | None = None) -> list[dict]: | |
| """Find all session .jsonl files, optionally filtered by recency.""" | |
| if not PROJECTS_DIR.is_dir(): | |
| return [] | |
| cutoff = None | |
| if days is not None: | |
| cutoff = datetime.now(timezone.utc) - timedelta(days=days) | |
| sessions = [] | |
| for project_dir in PROJECTS_DIR.iterdir(): | |
| if not project_dir.is_dir(): | |
| continue | |
| for jsonl_file in project_dir.glob("*.jsonl"): | |
| mtime = datetime.fromtimestamp(jsonl_file.stat().st_mtime, tz=timezone.utc) | |
| if cutoff and mtime < cutoff: | |
| continue | |
| sessions.append( | |
| { | |
| "file": jsonl_file, | |
| "session_id": jsonl_file.stem, | |
| "project_dir": project_dir.name, | |
| "mtime": mtime, | |
| "size_bytes": jsonl_file.stat().st_size, | |
| } | |
| ) | |
| sessions.sort(key=lambda s: s["mtime"], reverse=True) | |
| return sessions | |
| def resolve_session(identifier: str) -> Path | None: | |
| """Resolve a session ID, partial ID, or file path to a .jsonl Path.""" | |
| # Direct file path | |
| p = Path(identifier) | |
| if p.is_file(): | |
| return p | |
| if not PROJECTS_DIR.is_dir(): | |
| return None | |
| # Exact match | |
| for project_dir in PROJECTS_DIR.iterdir(): | |
| if not project_dir.is_dir(): | |
| continue | |
| exact = project_dir / f"{identifier}.jsonl" | |
| if exact.is_file(): | |
| return exact | |
| # Prefix match | |
| for project_dir in PROJECTS_DIR.iterdir(): | |
| if not project_dir.is_dir(): | |
| continue | |
| for jsonl_file in project_dir.glob("*.jsonl"): | |
| if jsonl_file.stem.startswith(identifier): | |
| return jsonl_file | |
| return None | |
| # ── JSONL parsing ─────────────────────────────────────────────────── | |
| def read_events(path: Path) -> list[dict]: | |
| """Read all JSON events from a .jsonl file.""" | |
| events = [] | |
| with open(path, "r", encoding="utf-8", errors="replace") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| events.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| continue | |
| return events | |
| def extract_title(events: list[dict]) -> str: | |
| """Extract session title from events: custom-title > ai-title > first user message.""" | |
| def _to_text(obj) -> str | None: | |
| if obj is None: | |
| return None | |
| if isinstance(obj, str): | |
| return obj | |
| if isinstance(obj, list): | |
| parts = [] | |
| for item in obj: | |
| if isinstance(item, str): | |
| parts.append(item) | |
| elif isinstance(item, dict): | |
| parts.append( | |
| _to_text(item.get("text") or item.get("content")) | |
| ) | |
| return "\n".join(p for p in parts if p) | |
| if isinstance(obj, dict): | |
| return _to_text(obj.get("text") or obj.get("content")) | |
| return str(obj) | |
| # Try custom-title, then ai-title | |
| for title_type in ("custom-title", "ai-title"): | |
| for ev in events: | |
| if ev.get("type") == title_type or ev.get("event") == title_type: | |
| raw = ev.get("title") or ev.get("customTitle") or ev.get("aiTitle") or ev.get("value") or ev.get("text") or ev.get("content") | |
| t = _to_text(raw) | |
| if t and t.strip(): | |
| return _clip(t) | |
| # Fall back to first non-noise user message | |
| skip_prefixes = ( | |
| "<local-command-", | |
| "<command-name>", | |
| "<command-message>", | |
| "<system-reminder>", | |
| "<observed_from_primary_session>", | |
| "You are a Claude-Mem", | |
| "- You are agent", | |
| "Hello memory agent", | |
| "<ide_opened_file>", | |
| "PROGRESS SUMMARY CHECKPOINT", | |
| "File created successfully", | |
| "diff --git", | |
| "This session is being continued", | |
| "No matching deferred tools", | |
| ".gitignore |", | |
| ) | |
| skip_contains = ( | |
| "You are agent", | |
| ) | |
| for ev in events: | |
| if ev.get("isMeta"): | |
| continue | |
| if ev.get("type") == "user" or ev.get("role") == "user" or (ev.get("message", {}) or {}).get("role") == "user": | |
| msg = ev.get("message", {}) if isinstance(ev.get("message"), dict) else {} | |
| raw = ev.get("content") or msg.get("content") or ev.get("text") or msg.get("text") | |
| t = _to_text(raw) | |
| if t and t.strip(): | |
| stripped = t.strip() | |
| if any(stripped.startswith(pfx) for pfx in skip_prefixes): | |
| continue | |
| if any(s in stripped for s in skip_contains): | |
| continue | |
| return _clip(t) | |
| return "(untitled)" | |
| def _clip(text: str, max_len: int = 90) -> str: | |
| text = re.sub(r"[\r\n\t]+", " ", text) | |
| text = re.sub(r"\s{2,}", " ", text).strip() | |
| if len(text) > max_len: | |
| return text[: max_len - 3] + "..." | |
| return text | |
| # ── Compact export logic ──────────────────────────────────────────── | |
| def _stringify(obj) -> str | None: | |
| if obj is None: | |
| return None | |
| if isinstance(obj, str): | |
| return obj | |
| if isinstance(obj, (int, float, bool)): | |
| return str(obj) | |
| if isinstance(obj, list): | |
| parts = [_stringify(x) for x in obj] | |
| return "\n".join(p for p in parts if p) | |
| if isinstance(obj, dict): | |
| for key in ("text", "content", "message"): | |
| if key in obj: | |
| return _stringify(obj[key]) | |
| return json.dumps(obj, ensure_ascii=False) | |
| return json.dumps(obj, ensure_ascii=False) | |
| def _truncate(text: str, max_chars: int) -> tuple[str, int]: | |
| """Return (possibly truncated text, remaining_chars). | |
| remaining_chars is 0 if not truncated, otherwise the number of | |
| characters that were cut off. | |
| """ | |
| if len(text) > max_chars: | |
| remaining = len(text) - max_chars | |
| return text[:max_chars] + " …[truncated]", remaining | |
| return text, 0 | |
| def _is_noise(text: str) -> bool: | |
| return bool(NOISE_RE.search(text)) or not text.strip() | |
| def _strip_system_reminders(text: str) -> str: | |
| return SYSTEM_REMINDER_RE.sub("", text).rstrip() | |
| def _compact_tool_use(block: dict) -> dict: | |
| name = block.get("name", "") | |
| inp = block.get("input", {}) | |
| if name == "Read": | |
| compact_input = {"file": inp.get("file_path")} | |
| elif name == "Glob": | |
| compact_input = {"pattern": inp.get("pattern")} | |
| elif name == "Grep": | |
| compact_input = { | |
| "pattern": inp.get("pattern"), | |
| "path": inp.get("path"), | |
| } | |
| elif name == "Write": | |
| content = inp.get("content", "") | |
| compact_input = { | |
| "file": inp.get("file_path"), | |
| "lines": content.count("\n") + 1, | |
| } | |
| elif name == "Edit": | |
| compact_input = {"file": inp.get("file_path")} | |
| elif name == "Bash": | |
| cmd_raw = inp.get("command", "") | |
| cmd_text, remaining = _truncate(cmd_raw, 300) | |
| compact_input = {"cmd": cmd_text} | |
| if remaining: | |
| compact_input["remaining_chars"] = remaining | |
| elif name == "Agent": | |
| compact_input = { | |
| "desc": inp.get("description"), | |
| "type": inp.get("subagent_type"), | |
| } | |
| else: | |
| compact_input = inp | |
| return { | |
| "tool": name, | |
| "input": compact_input, | |
| "id": block.get("id"), | |
| } | |
| def _compact_tool_result(block: dict, max_chars: int) -> dict: | |
| content = block.get("content", "") | |
| text = _stringify(content) or "" | |
| truncated_text, remaining = _truncate(text, max_chars) | |
| result = { | |
| "tool_use_id": block.get("tool_use_id"), | |
| "is_error": block.get("is_error", False), | |
| "content": truncated_text, | |
| } | |
| if remaining: | |
| result["remaining_chars"] = remaining | |
| return result | |
| def _compact_block(block: dict, max_chars: int) -> dict: | |
| btype = block.get("type") | |
| if btype == "tool_use": | |
| return _compact_tool_use(block) | |
| if btype == "tool_result": | |
| return _compact_tool_result(block, max_chars) | |
| if btype == "text": | |
| raw = block.get("text", "") | |
| text, remaining = _truncate(raw, 5000) | |
| result = {"text": text} | |
| if remaining: | |
| result["remaining_chars"] = remaining | |
| return result | |
| if btype == "image": | |
| return {"image": True} | |
| raw = _stringify(block.get("text") or block.get("content")) or "" | |
| text, remaining = _truncate(raw, 2000) | |
| result = {"type": btype, "text": text} | |
| if remaining: | |
| result["remaining_chars"] = remaining | |
| return result | |
| def build_compact(events: list[dict], max_chars: int = MAX_TOOL_RESULT_CHARS) -> dict: | |
| """Build compact session representation from raw events.""" | |
| turns = [] | |
| for ev in events: | |
| if ev.get("type") not in ("user", "assistant"): | |
| continue | |
| if ev.get("isMeta"): | |
| continue | |
| msg = ev.get("message", {}) if isinstance(ev.get("message"), dict) else {} | |
| raw_content = msg.get("content") or ev.get("content") | |
| role = ev["type"] | |
| # Process content | |
| if raw_content is None: | |
| continue | |
| if isinstance(raw_content, str): | |
| if _is_noise(raw_content): | |
| continue | |
| content = _strip_system_reminders(raw_content) | |
| if not content.strip(): | |
| continue | |
| elif isinstance(raw_content, list): | |
| # Filter noise blocks, then compact | |
| filtered = [] | |
| for block in raw_content: | |
| if isinstance(block, dict) and block.get("type") == "text": | |
| text = block.get("text", "") | |
| if _is_noise(text): | |
| continue | |
| filtered.append(block) | |
| if not filtered: | |
| continue | |
| compacted = [_compact_block(b, max_chars) for b in filtered if isinstance(b, dict)] | |
| # Strip system reminders from text blocks | |
| for b in compacted: | |
| if "text" in b and isinstance(b["text"], str): | |
| b["text"] = _strip_system_reminders(b["text"]) | |
| compacted = [b for b in compacted if b] | |
| if not compacted: | |
| continue | |
| content = compacted | |
| else: | |
| s = _stringify(raw_content) | |
| if not s or not s.strip(): | |
| continue | |
| content = s | |
| turn = { | |
| "role": role, | |
| "ts": ev.get("timestamp"), | |
| "content": content, | |
| } | |
| if role == "assistant": | |
| model = msg.get("model") | |
| if model: | |
| turn["model"] = model | |
| turns.append(turn) | |
| timestamps = [t["ts"] for t in turns if t.get("ts")] | |
| error_count = 0 | |
| for t in turns: | |
| if isinstance(t.get("content"), list): | |
| for block in t["content"]: | |
| if isinstance(block, dict) and block.get("is_error"): | |
| error_count += 1 | |
| # Find branch and project from first event that has them | |
| branch = None | |
| project = None | |
| for ev in events: | |
| if not branch and ev.get("gitBranch"): | |
| branch = ev["gitBranch"] | |
| if not project and ev.get("cwd"): | |
| project = Path(ev["cwd"]).name | |
| if branch and project: | |
| break | |
| first_event = events[0] if events else {} | |
| return { | |
| "session_id": first_event.get("sessionId", ""), | |
| "project": project, | |
| "branch": branch, | |
| "title": extract_title(events), | |
| "started": min(timestamps) if timestamps else None, | |
| "ended": max(timestamps) if timestamps else None, | |
| "turn_count": len(turns), | |
| "error_count": error_count, | |
| "turns": turns, | |
| } | |
| # ── Full export logic ─────────────────────────────────────────────── | |
| def _to_text_full(obj) -> str | None: | |
| """Recursively extract text from nested structures (full export).""" | |
| if obj is None: | |
| return None | |
| if isinstance(obj, str): | |
| return obj | |
| if isinstance(obj, (int, float, bool)): | |
| return str(obj) | |
| if isinstance(obj, list): | |
| parts = [] | |
| for item in obj: | |
| if item is None: | |
| continue | |
| if isinstance(item, str): | |
| parts.append(item) | |
| elif isinstance(item, (int, float, bool)): | |
| parts.append(str(item)) | |
| elif isinstance(item, dict): | |
| t = _to_text_full(item.get("text") or item.get("content") or item.get("message")) | |
| if t: | |
| parts.append(t) | |
| return "\n".join(p for p in parts if p) | |
| if isinstance(obj, dict): | |
| for key in ("text", "content", "message"): | |
| if key in obj: | |
| return _to_text_full(obj[key]) | |
| return None | |
| return None | |
| def build_full_export(events: list[dict], session_id: str, session_file: str, project_dir: str) -> dict: | |
| """Build full formatted export from raw events.""" | |
| messages = [] | |
| for ev in events: | |
| role = ( | |
| ev.get("role") | |
| or (ev.get("message", {}) or {}).get("role") | |
| or ev.get("sender") | |
| or ev.get("author") | |
| or ev.get("type") | |
| or "unknown" | |
| ) | |
| timestamp = ( | |
| ev.get("timestamp") | |
| or ev.get("created_at") | |
| or ev.get("time") | |
| or (ev.get("message", {}) or {}).get("created_at") | |
| or (ev.get("message", {}) or {}).get("timestamp") | |
| ) | |
| content = _to_text_full( | |
| ev.get("content") | |
| or (ev.get("message", {}) or {}).get("content") | |
| or ev.get("text") | |
| or (ev.get("message", {}) or {}).get("text") | |
| ) | |
| if content: | |
| messages.append({"role": role, "timestamp": timestamp, "content": content}) | |
| return { | |
| "session_id": session_id, | |
| "session_file": session_file, | |
| "project_dir": project_dir, | |
| "event_count": len(events), | |
| "title": extract_title(events), | |
| "raw_events": events, | |
| "messages": [{"role": m["role"], "timestamp": m["timestamp"], "content": m["content"], "raw": None} for m in messages], | |
| "llm_readable": messages, | |
| } | |
| # ── CLI commands ──────────────────────────────────────────────────── | |
| def cmd_list(args: argparse.Namespace) -> None: | |
| sessions = find_all_sessions(days=args.days) | |
| if not sessions: | |
| print(f"No sessions found in the last {args.days} day(s).", file=sys.stderr) | |
| sys.exit(1) | |
| # Enrich with titles | |
| enriched = [] | |
| for s in sessions: | |
| events = read_events(s["file"]) | |
| title = extract_title(events) | |
| s["title"] = title | |
| enriched.append(s) | |
| if args.json: | |
| out = [ | |
| { | |
| "session_id": s["session_id"], | |
| "project_dir": s["project_dir"], | |
| "title": s["title"], | |
| "mtime": s["mtime"].isoformat(), | |
| "size_bytes": s["size_bytes"], | |
| "file": str(s["file"]), | |
| } | |
| for s in enriched | |
| ] | |
| print(json.dumps(out, indent=2, ensure_ascii=False)) | |
| return | |
| # Table output | |
| fmt = "{:<36} {:<20} {:<8} {}" | |
| print(fmt.format("SESSION_ID", "LAST_MODIFIED", "SIZE", "TITLE")) | |
| print("-" * 120) | |
| for s in enriched: | |
| mtime_str = s["mtime"].strftime("%Y-%m-%d %H:%M") | |
| size_kb = f"{s['size_bytes'] // 1024}KB" | |
| print(fmt.format(s["session_id"], mtime_str, size_kb, s["title"])) | |
| def cmd_search(args: argparse.Namespace) -> None: | |
| query = args.query.lower() | |
| sessions = find_all_sessions(days=args.days) | |
| if not sessions: | |
| print("No sessions found.", file=sys.stderr) | |
| sys.exit(1) | |
| matches = [] | |
| for s in sessions: | |
| events = read_events(s["file"]) | |
| title = extract_title(events) | |
| s["title"] = title | |
| # Extract branch for searching | |
| branch = "" | |
| for ev in events: | |
| if ev.get("gitBranch"): | |
| branch = ev["gitBranch"] | |
| break | |
| s["branch"] = branch | |
| # Check metadata fields (title, id, branch, project) | |
| meta_text = " ".join([ | |
| title, s["session_id"], branch, s["project_dir"], | |
| ]).lower() | |
| if query in meta_text: | |
| matches.append(s) | |
| continue | |
| # Full content search unless --title-only | |
| if not args.title_only: | |
| try: | |
| raw_bytes = s["file"].read_bytes() | |
| if query.encode() in raw_bytes.lower(): | |
| matches.append(s) | |
| except OSError: | |
| pass | |
| if not matches: | |
| print(f'No sessions matching "{args.query}".', file=sys.stderr) | |
| sys.exit(1) | |
| if args.json: | |
| out = [ | |
| { | |
| "session_id": s["session_id"], | |
| "project_dir": s["project_dir"], | |
| "title": s["title"], | |
| "mtime": s["mtime"].isoformat(), | |
| "file": str(s["file"]), | |
| } | |
| for s in matches | |
| ] | |
| print(json.dumps(out, indent=2, ensure_ascii=False)) | |
| return | |
| fmt = "{:<36} {:<20} {}" | |
| print(fmt.format("SESSION_ID", "LAST_MODIFIED", "TITLE")) | |
| print("-" * 100) | |
| for s in matches: | |
| mtime_str = s["mtime"].strftime("%Y-%m-%d %H:%M") | |
| print(fmt.format(s["session_id"], mtime_str, s["title"])) | |
| def cmd_show(args: argparse.Namespace) -> None: | |
| path = resolve_session(args.session) | |
| if not path: | |
| print(f"Session not found: {args.session}", file=sys.stderr) | |
| sys.exit(1) | |
| events = read_events(path) | |
| title = extract_title(events) | |
| session_id = path.stem | |
| project_dir = path.parent.name | |
| # Count event types | |
| type_counts: dict[str, int] = {} | |
| for ev in events: | |
| t = ev.get("type", "unknown") | |
| type_counts[t] = type_counts.get(t, 0) + 1 | |
| user_count = type_counts.get("user", 0) | |
| assistant_count = type_counts.get("assistant", 0) | |
| # Timestamps | |
| timestamps = [ev.get("timestamp") for ev in events if ev.get("timestamp")] | |
| started = min(timestamps) if timestamps else "unknown" | |
| ended = max(timestamps) if timestamps else "unknown" | |
| # Branch | |
| branch = None | |
| for ev in events: | |
| if ev.get("gitBranch"): | |
| branch = ev["gitBranch"] | |
| break | |
| # Errors | |
| error_count = 0 | |
| for ev in events: | |
| if ev.get("type") == "user": | |
| content = (ev.get("message", {}) or {}).get("content") | |
| if isinstance(content, list): | |
| for block in content: | |
| if isinstance(block, dict) and block.get("is_error"): | |
| error_count += 1 | |
| info = { | |
| "session_id": session_id, | |
| "project_dir": project_dir, | |
| "title": title, | |
| "branch": branch, | |
| "started": started, | |
| "ended": ended, | |
| "total_events": len(events), | |
| "user_turns": user_count, | |
| "assistant_turns": assistant_count, | |
| "errors": error_count, | |
| "file": str(path), | |
| "size": f"{path.stat().st_size // 1024}KB", | |
| } | |
| if args.json: | |
| print(json.dumps(info, indent=2, ensure_ascii=False)) | |
| return | |
| for key, val in info.items(): | |
| print(f" {key:<20} {val}") | |
| def cmd_export(args: argparse.Namespace) -> None: | |
| path = resolve_session(args.session) | |
| if not path: | |
| print(f"Session not found: {args.session}", file=sys.stderr) | |
| sys.exit(1) | |
| events = read_events(path) | |
| session_id = path.stem | |
| project_dir = path.parent.name | |
| output = args.output or f"./{session_id}.formatted.json" | |
| result = build_full_export(events, session_id, str(path), project_dir) | |
| with open(output, "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2, ensure_ascii=False) | |
| orig_size = path.stat().st_size | |
| out_size = Path(output).stat().st_size | |
| print(f"Wrote full export to: {output}") | |
| print(f" Source: {orig_size // 1024}KB") | |
| print(f" Output: {out_size // 1024}KB") | |
| def cmd_compact(args: argparse.Namespace) -> None: | |
| path = resolve_session(args.session) | |
| if not path: | |
| print(f"Session not found: {args.session}", file=sys.stderr) | |
| sys.exit(1) | |
| events = read_events(path) | |
| session_id = path.stem | |
| project_dir = path.parent.name | |
| output = args.output or f"./{session_id}.compact.json" | |
| max_chars = args.max_tool_chars or MAX_TOOL_RESULT_CHARS | |
| result = build_compact(events, max_chars=max_chars) | |
| # Override session_id/project from file path (more reliable than event data) | |
| result["session_id"] = session_id | |
| result["project"] = project_dir | |
| with open(output, "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2, ensure_ascii=False) | |
| orig_size = path.stat().st_size | |
| out_size = Path(output).stat().st_size | |
| reduction = (orig_size - out_size) * 100 // orig_size if orig_size > 0 else 0 | |
| print(f"Wrote compact export to: {output}") | |
| print(f" Source: {orig_size // 1024}KB ({orig_size} bytes)") | |
| print(f" Compact: {out_size // 1024}KB ({out_size} bytes)") | |
| print(f" Reduction: {reduction}%") | |
| def cmd_inspect(args: argparse.Namespace) -> None: | |
| """Read full untruncated content for specific IDs from a session. | |
| Supported ID formats: | |
| - toolu_xxx → matches tool_use id or tool_result.tool_use_id | |
| - turn:N → matches the Nth turn (0-indexed) in compact output | |
| """ | |
| path = resolve_session(args.session) | |
| if not path: | |
| print( | |
| f"Session not found: {args.session}", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| events = read_events(path) | |
| # Build lookup indexes from raw events | |
| # tool_use id → full input dict | |
| tool_calls: dict[str, dict] = {} | |
| # tool_use_id → full result content (string) | |
| tool_results: dict[str, dict] = {} | |
| # Also build ordered turns (same filtering as compact) | |
| turns: list[dict] = [] | |
| for ev in events: | |
| if ev.get("type") not in ("user", "assistant"): | |
| continue | |
| if ev.get("isMeta"): | |
| continue | |
| msg = ( | |
| ev.get("message", {}) | |
| if isinstance(ev.get("message"), dict) | |
| else {} | |
| ) | |
| raw_content = msg.get("content") or ev.get("content") | |
| role = ev["type"] | |
| # Index tool_use and tool_result blocks | |
| if isinstance(raw_content, list): | |
| for block in raw_content: | |
| if not isinstance(block, dict): | |
| continue | |
| if block.get("type") == "tool_use": | |
| tid = block.get("id") | |
| if tid: | |
| tool_calls[tid] = { | |
| "id": tid, | |
| "tool": block.get("name"), | |
| "input": block.get("input", {}), | |
| } | |
| elif block.get("type") == "tool_result": | |
| tuid = block.get("tool_use_id") | |
| if tuid: | |
| raw = block.get("content", "") | |
| tool_results[tuid] = { | |
| "tool_use_id": tuid, | |
| "is_error": block.get( | |
| "is_error", False | |
| ), | |
| "content": _stringify(raw) or "", | |
| } | |
| # Build turn for turn:N indexing (lightweight) | |
| if raw_content is None: | |
| continue | |
| if isinstance(raw_content, str): | |
| if _is_noise(raw_content): | |
| continue | |
| text = _strip_system_reminders(raw_content) | |
| if not text.strip(): | |
| continue | |
| turns.append({ | |
| "role": role, | |
| "ts": ev.get("timestamp"), | |
| "content": text, | |
| }) | |
| elif isinstance(raw_content, list): | |
| # Keep full content blocks (no truncation) | |
| filtered = [] | |
| for block in raw_content: | |
| if isinstance(block, dict): | |
| btype = block.get("type") | |
| if btype == "text": | |
| t = block.get("text", "") | |
| if _is_noise(t): | |
| continue | |
| filtered.append({ | |
| "type": "text", | |
| "text": _strip_system_reminders(t), | |
| }) | |
| elif btype == "tool_use": | |
| filtered.append({ | |
| "type": "tool_use", | |
| "id": block.get("id"), | |
| "tool": block.get("name"), | |
| "input": block.get("input", {}), | |
| }) | |
| elif btype == "tool_result": | |
| raw = block.get("content", "") | |
| filtered.append({ | |
| "type": "tool_result", | |
| "tool_use_id": block.get( | |
| "tool_use_id" | |
| ), | |
| "is_error": block.get( | |
| "is_error", False | |
| ), | |
| "content": _stringify(raw) or "", | |
| }) | |
| else: | |
| filtered.append(block) | |
| if filtered: | |
| turns.append({ | |
| "role": role, | |
| "ts": ev.get("timestamp"), | |
| "content": filtered, | |
| }) | |
| # Resolve each requested ID | |
| results = [] | |
| for ref in args.ids: | |
| if ref.startswith("turn:"): | |
| try: | |
| idx = int(ref.split(":", 1)[1]) | |
| except ValueError: | |
| results.append({ | |
| "ref": ref, | |
| "error": "Invalid turn index", | |
| }) | |
| continue | |
| if 0 <= idx < len(turns): | |
| results.append({ | |
| "ref": ref, | |
| **turns[idx], | |
| }) | |
| else: | |
| results.append({ | |
| "ref": ref, | |
| "error": ( | |
| f"Turn index out of range " | |
| f"(0-{len(turns) - 1})" | |
| ), | |
| }) | |
| elif ref in tool_results: | |
| results.append({"ref": ref, **tool_results[ref]}) | |
| elif ref in tool_calls: | |
| results.append({"ref": ref, **tool_calls[ref]}) | |
| else: | |
| results.append({"ref": ref, "error": "ID not found"}) | |
| print(json.dumps(results, indent=2, ensure_ascii=False)) | |
| # ── Argument parser ───────────────────────────────────────────────── | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| prog="claude_sessions", | |
| description="Browse, search, and export Claude Code sessions.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| examples: | |
| %(prog)s list # list sessions from last 3 days | |
| %(prog)s list --days 30 # list sessions from last 30 days | |
| %(prog)s list --json # machine-readable output | |
| %(prog)s search "refactor" # search by title keyword | |
| %(prog)s search "lead db" --days 30 # search with wider window | |
| %(prog)s show 63d559bd # show session metadata | |
| %(prog)s export-chat 63d559bd # compact export for LLM review (default) | |
| %(prog)s export-chat 63d559bd --max-tool-chars 1500 # larger tool results | |
| %(prog)s export-full-chat 63d559bd # full JSON export | |
| %(prog)s inspect 63d559bd toolu_013CCo... # full tool result | |
| %(prog)s inspect 63d559bd turn:5 # full turn content | |
| %(prog)s inspect 63d559bd toolu_a toolu_b # multiple at once | |
| """, | |
| ) | |
| sub = parser.add_subparsers(dest="command", required=True) | |
| # list | |
| p_list = sub.add_parser( | |
| "list", help="List recent sessions", | |
| ) | |
| p_list.add_argument( | |
| "--days", type=int, default=3, | |
| help="How many days back to look (default: 3)", | |
| ) | |
| p_list.add_argument( | |
| "--json", action="store_true", | |
| help="Output as JSON", | |
| ) | |
| # search | |
| p_search = sub.add_parser( | |
| "search", help="Search sessions by title", | |
| ) | |
| p_search.add_argument( | |
| "query", | |
| help="Search term (case-insensitive, matches " | |
| "title, branch, or session ID)", | |
| ) | |
| p_search.add_argument( | |
| "--days", type=int, default=90, | |
| help="How many days back to search (default: 90)", | |
| ) | |
| p_search.add_argument( | |
| "--title-only", action="store_true", | |
| help="Only search titles, branch, and session ID " | |
| "(skip full content search — faster)", | |
| ) | |
| p_search.add_argument( | |
| "--json", action="store_true", | |
| help="Output as JSON", | |
| ) | |
| # show | |
| p_show = sub.add_parser( | |
| "show", help="Show session metadata", | |
| ) | |
| p_show.add_argument( | |
| "session", | |
| help="Session ID (full or partial) or path to .jsonl", | |
| ) | |
| p_show.add_argument( | |
| "--json", action="store_true", | |
| help="Output as JSON", | |
| ) | |
| # export-full-chat | |
| p_export = sub.add_parser( | |
| "export-full-chat", help="Full formatted JSON export (all events + messages)", | |
| ) | |
| p_export.add_argument( | |
| "session", | |
| help="Session ID (full or partial) or path to .jsonl", | |
| ) | |
| p_export.add_argument( | |
| "-o", "--output", | |
| help="Output file path (default: ./<id>.formatted.json)", | |
| ) | |
| # export-chat | |
| p_compact = sub.add_parser( | |
| "export-chat", help="Compact JSON export for LLM review (default for agents)", | |
| ) | |
| p_compact.add_argument( | |
| "session", | |
| help="Session ID (full or partial) or path to .jsonl", | |
| ) | |
| p_compact.add_argument( | |
| "-o", "--output", | |
| help="Output file path (default: ./<id>.compact.json)", | |
| ) | |
| p_compact.add_argument( | |
| "--max-tool-chars", type=int, default=None, | |
| help="Max chars per tool result " | |
| f"(default: {MAX_TOOL_RESULT_CHARS})", | |
| ) | |
| # inspect | |
| p_inspect = sub.add_parser( | |
| "inspect", | |
| help="Read full untruncated content by ID", | |
| ) | |
| p_inspect.add_argument( | |
| "session", | |
| help="Session ID (full or partial) or path to .jsonl", | |
| ) | |
| p_inspect.add_argument( | |
| "ids", nargs="+", | |
| help="One or more IDs: tool_use_id (toolu_xxx), " | |
| "tool call id, or turn:N", | |
| ) | |
| return parser | |
| def main() -> None: | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| dispatch = { | |
| "list": cmd_list, | |
| "search": cmd_search, | |
| "show": cmd_show, | |
| "export-full-chat": cmd_export, | |
| "export-chat": cmd_compact, | |
| "inspect": cmd_inspect, | |
| } | |
| dispatch[args.command](args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment