Skip to content

Instantly share code, notes, and snippets.

@rothnic
Last active April 6, 2026 15:27
Show Gist options
  • Select an option

  • Save rothnic/74ed3764f4db07f16594845c38455abb to your computer and use it in GitHub Desktop.

Select an option

Save rothnic/74ed3764f4db07f16594845c38455abb to your computer and use it in GitHub Desktop.
OpenCode Debug Proxy - Complete traffic capture and analysis tool

OpenCode Debug Proxy

A comprehensive debugging and traffic capture tool for OpenCode that captures all API traffic, exports session data (including subagents), and automatically generates analysis summaries.

Overview

This tool creates a complete debug capture of your OpenCode sessions including:

  • Full HTTP traffic with request/response bodies (sanitized)
  • Session data export (parent + all subagent sessions)
  • Automatic analysis (errors, slow API calls, key parts)
  • Privacy protection (API keys, tokens, IPs automatically redacted)
  • GitHub Gist upload for easy sharing

Dependencies

Required

  • zsh - Shell environment
  • mitmproxy - HTTP/HTTPS proxy (auto-installed if missing)
  • GitHub CLI (gh) - For gist upload feature
  • Python 3 - For session export and analysis

Optional (for enhanced privacy)

  • uv - Fast Python package installer (for Presidio auto-install)
  • Microsoft Presidio - ML-based PII detection (auto-installed via uv)

Install mitmproxy (if not present)

# macOS
brew install mitmproxy

# Linux
pip install mitmproxy

# Or let the script auto-install it

Install GitHub CLI (for gist upload)

# macOS
brew install gh

# Linux
curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg | sudo dd of=/usr/share/keyrings/githubcli-archive-keyring.gpg
sudo apt update && sudo apt install gh

# Then authenticate
gh auth login

Installation

  1. Download the files:

    # Clone or download the gist files
    git clone https://gist.github.com/YOUR_GIST_ID.git opencode-debug
    cd opencode-debug
  2. Copy to your zsh custom directory:

    mkdir -p ~/.oh-my-zsh/custom
    cp proxy-opencode-debug.zsh ~/.oh-my-zsh/custom/
  3. Source the file (or restart your shell):

    source ~/.oh-my-zsh/custom/proxy-opencode-debug.zsh

Quick Start

# Basic usage - capture all traffic
proxy_opencode

# Use isolated config (fresh, temporary)
proxy_opencode --isolated

# Capture and auto-upload to gist
proxy_opencode --upload

# Use a named isolated config
proxy_opencode --config my-test-config

Aliases

  • poc - proxy_opencode
  • poci - proxy_opencode --isolated
  • pocu - proxy_opencode --upload
  • poccfg - Create isolated config
  • pocedit - Edit isolated config
  • poclist - List isolated configs
  • pocdel - Delete isolated config

What Gets Captured

HTTP Traffic (requests.jsonl)

Full JSON Lines format with:

  • Complete request/response headers (sanitized)
  • Full request/response bodies (no truncation)
  • Timing data (duration_ms, timestamps)
  • All API calls to OpenAI, Anthropic, Kimi, etc.

Session Data (session-data/)

  • sessions.json - All sessions (parent + subagents)
  • messages.json - All conversation messages
  • parts.json - Message parts/components
  • summary.json - Session tree overview

Auto-Generated Analysis

  • errors.json - Machine-readable error data with references
  • errors-summary.md - Human-readable error summary
  • key-parts.json - Key parts with error context
  • slow-api-calls.json - Slow API call data
  • slow-api-calls.md - Slow API call analysis
  • quick-reference.json - Quick reference for external analysis

Privacy Protection

All sensitive data is automatically redacted:

Headers Replaced with [TRUNCATED]

  • Authorization, x-api-key, api-key, token, cookie
  • client-secret, access-token, refresh-token
  • x-session-id (kept as identifier, not secret)

Body Patterns Redacted

  • API keys ("api_key": "...""api_key": "[TRUNCATED]")
  • Bearer tokens, JWTs
  • Passwords
  • OpenAI keys (sk-...)

IP Address Masking

  • Public IPs: [IP-MASKED]
  • Private IPs (10.x, 192.168.x, 127.x): Preserved

Enhanced Privacy (Optional)

If uv is installed, the script auto-installs Microsoft Presidio for ML-based PII detection.

Output Files

Session data is saved to: /tmp/opencode-debug-logs/YYYYMMDD-HHMMSS/

YYYYMMDD-HHMMSS/
├── README.txt                    # Session documentation
├── mitm.log                      # Proxy connection logs
├── requests.jsonl                # Complete HTTP traffic
├── errors.json                   # Machine-readable errors
├── errors-summary.md            # Human-readable error summary
├── key-parts.json               # Key parts with error context
├── slow-api-calls.json          # Slow API call data
├── slow-api-calls.md            # Slow API call analysis
├── quick-reference.json         # Quick reference for analysis
├── session-data/
│   ├── sessions.json            # Session metadata
│   ├── messages.json            # Conversation messages
│   ├── parts.json               # Message parts
│   └── summary.json             # Session tree
├── config/                      # OpenCode config backup
└── session-archive.tar.gz       # Complete archive

Usage Examples

Debug a specific issue

# Run with isolated config to avoid affecting main setup
proxy_opencode --isolated

# In OpenCode, reproduce the issue
# Exit OpenCode when done

# Check the generated analysis
cd /tmp/opencode-debug-logs/latest
jq '.errors[] | {session_slug, error_type}' errors.json

Share debug logs

# Capture and auto-upload to gist
proxy_opencode --upload

# Gist URL will be printed at the end

Analyze errors

# After session completes
cd /tmp/opencode-debug-logs/YYYYMMDD-HHMMSS

# List all errors
jq '.errors[] | .error_type' errors.json | sort | uniq -c

# Find corrupted file paths
jq '.errors[] | select(.input.filePath | startswith(": ")) | .input.filePath' errors.json

# Get slow API calls
jq '.slow_calls[] | {url: .url[:60], duration_sec}' slow-api-calls.json

Query Examples

The quick-reference.json file includes useful jq queries:

# Get all errors
cat errors.json | jq '.errors[] | {session_slug, timestamp, error_type, error_message}'

# Get errors by type
cat errors.json | jq '[.errors[] | select(.error_type == "file_not_found")]'

# Get slow API calls
cat requests.jsonl | jq 'select(.duration_ms > 30000) | {url, duration_ms}'

# Get key parts
cat key-parts.json | jq '.key_parts[] | {part_id, session_id, type, reason}'

How It Works

  1. Starts mitmweb proxy on localhost:8080 (web UI on :8081)
  2. Sets HTTP_PROXY/HTTPS_PROXY environment variables
  3. Runs OpenCode with proxy configuration
  4. Captures all HTTP traffic through the proxy
  5. On exit:
    • Exports OpenCode session data (SQLite → JSON)
    • Runs analysis scripts
    • Creates archive
    • Uploads to gist (if --upload flag)

Configuration

Isolated Configs

Create isolated OpenCode configs for testing:

# Create new config
poccfg my-test

# Edit config
pocedit my-test

# Use config
proxy_opencode --config my-test

Default Config

Isolated configs default to:

{
  "$schema": "https://opencode.ai/config.json",
  "model": "kimi-for-coding/k2p5",
  "permission": {
    "bash": {
      "*": "allow"
    }
  }
}

Troubleshooting

mitmproxy not found

The script will auto-install mitmproxy if missing. Restart your shell after installation.

GitHub CLI not authenticated

gh auth login

Certificate errors

The script automatically sets NODE_EXTRA_CA_CERTS to trust the mitmproxy CA.

Session data not exporting

Ensure Python 3 is available. The script will use system Python as fallback if venv Python is not available.

Files in This Gist

  • README.md - This documentation
  • proxy-opencode-debug.zsh - Main zsh function (~1800 lines)
  • export_sessions.py - Session data export utility
  • analyze_session.py - Automatic analysis script

License

MIT License - Use at your own risk. This is a debugging tool that captures sensitive API traffic. Always verify sanitization before sharing captures.

Contributing

This tool was created for debugging OpenCode issues. Feel free to fork and modify for your needs.

See Also

#!/usr/bin/env python3
"""
Post-process OpenCode debug session data to generate analysis summaries.
Generates machine-readable extracts for external analysis tools.
"""
import json
import sys
from collections import defaultdict, Counter
from datetime import datetime
from pathlib import Path
def load_session_data(session_dir):
"""Load all session data files."""
data = {}
parts_path = Path(session_dir) / "parts.json"
if parts_path.exists():
with open(parts_path, "r") as f:
data["parts"] = json.load(f)
messages_path = Path(session_dir) / "messages.json"
if messages_path.exists():
with open(messages_path, "r") as f:
data["messages"] = json.load(f)
sessions_path = Path(session_dir) / "sessions.json"
if sessions_path.exists():
with open(sessions_path, "r") as f:
data["sessions"] = json.load(f)
summary_path = Path(session_dir) / "summary.json"
if summary_path.exists():
with open(summary_path, "r") as f:
data["summary"] = json.load(f)
return data
def extract_errors(parts, sessions):
"""Extract all errors from parts with references."""
errors = []
session_map = {s["id"]: s for s in sessions}
for p in parts:
data = p.get("data", {})
raw = data.get("raw", "")
if not raw:
continue
try:
tool_data = json.loads(raw)
if tool_data.get("type") == "tool":
state = tool_data.get("state", {})
status = state.get("status", "")
tool_name = tool_data.get("tool", "unknown")
if status == "error" or tool_name == "invalid":
error_info = {
"part_id": p.get("id"),
"session_id": p.get("session_id"),
"session_slug": session_map.get(p.get("session_id"), {}).get(
"slug", "unknown"
),
"timestamp": p.get("timestamp"),
"tool": tool_name,
"call_id": tool_data.get("callID", "unknown"),
"status": status,
"error_message": state.get("error")
or state.get("output", "Unknown error"),
"input": state.get("input", {}),
"error_type": categorize_error(
state.get("error") or state.get("output", "")
),
}
errors.append(error_info)
except:
pass
return errors
def categorize_error(error_msg):
"""Categorize error type."""
error_str = str(error_msg).lower()
if "file not found" in error_str or "not found" in error_str:
return "file_not_found"
elif "json parsing failed" in error_str or "parse error" in error_str:
return "json_parse_error"
elif "invalid input" in error_str:
return "invalid_input"
elif "permission" in error_str or "access" in error_str:
return "permission_denied"
elif "timeout" in error_str:
return "timeout"
elif "connection" in error_str:
return "connection_error"
else:
return "other"
def generate_error_summary(errors, sessions):
"""Generate human-readable error summary."""
lines = []
lines.append("# Error Summary")
lines.append(f"Generated: {datetime.now().isoformat()}")
lines.append("")
lines.append(f"Total Errors: {len(errors)}")
lines.append("")
# Group by session
by_session = defaultdict(list)
for e in errors:
by_session[e["session_slug"]].append(e)
lines.append("## Errors by Session")
lines.append("")
for slug, session_errors in sorted(by_session.items(), key=lambda x: -len(x[1])):
lines.append(f"### {slug}")
lines.append(f"Count: {len(session_errors)}")
lines.append("")
# Group by error type
by_type = defaultdict(list)
for e in session_errors:
by_type[e["error_type"]].append(e)
for error_type, type_errors in sorted(
by_type.items(), key=lambda x: -len(x[1])
):
lines.append(f"**{error_type}** ({len(type_errors)})")
for e in type_errors[:3]: # Show first 3 of each type
error_preview = str(e["error_message"])[:100].replace("\n", " ")
lines.append(f" - {e['timestamp']}: {error_preview}")
if "filePath" in e["input"]:
lines.append(f" File: {e['input']['filePath']}")
if len(type_errors) > 3:
lines.append(f" ... and {len(type_errors) - 3} more")
lines.append("")
lines.append("## Error Patterns")
lines.append("")
# Find common patterns
error_types = Counter(e["error_type"] for e in errors)
for error_type, count in error_types.most_common():
lines.append(f"- **{error_type}**: {count} occurrences")
return "\n".join(lines)
def generate_errors_json(errors):
"""Generate machine-readable errors JSON."""
return {
"generated_at": datetime.now().isoformat(),
"total_errors": len(errors),
"errors": errors,
}
def generate_key_parts_with_errors(parts, errors):
"""Generate list of key parts that contain or relate to errors."""
# Get all part IDs that are errors
error_part_ids = {e["part_id"] for e in errors}
# Also include parts from same sessions around error times
error_sessions = {e["session_id"] for e in errors}
key_parts = []
for p in parts:
part_id = p.get("id")
session_id = p.get("session_id")
# Include if it's an error part
if part_id in error_part_ids:
key_parts.append(
{
"part_id": part_id,
"session_id": session_id,
"timestamp": p.get("timestamp"),
"type": "error",
"reason": "contains_error",
}
)
# Include if it's a reasoning or text part in an error session
elif session_id in error_sessions:
data = p.get("data", {})
part_type = data.get("type", "unknown")
if part_type in ["reasoning", "text"]:
key_parts.append(
{
"part_id": part_id,
"session_id": session_id,
"timestamp": p.get("timestamp"),
"type": part_type,
"reason": "context_in_error_session",
"preview": str(data.get("text", ""))[:200],
}
)
return key_parts
def analyze_requests_jsonl(requests_path, errors):
"""Analyze requests.jsonl for slow calls and error correlations."""
if not Path(requests_path).exists():
return None
requests = []
with open(requests_path, "r") as f:
for line in f:
try:
requests.append(json.loads(line.strip()))
except:
pass
# Filter for slow API calls
slow_calls = []
for r in requests:
duration = r.get("duration_ms", 0)
if duration > 10000: # Calls over 10 seconds
slow_calls.append(
{
"id": r.get("id"),
"url": r.get("url", "unknown")[:100],
"method": r.get("method", "GET"),
"duration_ms": duration,
"duration_sec": round(duration / 1000, 1),
"timestamp": r.get("timestamp"),
}
)
# Sort by duration
slow_calls.sort(key=lambda x: -x["duration_ms"])
return {
"total_requests": len(requests),
"slow_calls": slow_calls[:50], # Top 50
"error_correlation": find_error_correlations(requests, errors),
}
def find_error_correlations(requests, errors):
"""Find correlations between slow requests and errors."""
# Simple heuristic: check if errors occurred around same time as slow requests
correlations = []
error_times = set()
for e in errors:
ts = e.get("timestamp", "")
if ts:
# Extract just the time portion for rough matching
error_times.add(ts[:16]) # YYYY-MM-DD HH:MM
for r in requests:
if r.get("duration_ms", 0) > 20000: # Very slow calls
req_time = r.get("timestamp", "")[:16]
if req_time in error_times:
correlations.append(
{
"request_id": r.get("id"),
"url": r.get("url", "unknown")[:80],
"duration_sec": round(r.get("duration_ms", 0) / 1000, 1),
"timestamp": r.get("timestamp"),
"correlation": "slow_request_during_errors",
}
)
return correlations
def generate_slow_api_summary(analysis):
"""Generate human-readable slow API summary."""
if not analysis:
return "# Slow API Calls\n\nNo requests.jsonl found."
lines = []
lines.append("# Slow API Calls Analysis")
lines.append(f"Generated: {datetime.now().isoformat()}")
lines.append("")
lines.append(f"Total Requests: {analysis['total_requests']}")
lines.append(f"Slow Calls (>10s): {len(analysis['slow_calls'])}")
lines.append("")
if analysis["slow_calls"]:
lines.append("## Top 20 Slowest Calls")
lines.append("")
lines.append("| Rank | Duration | URL | Method |")
lines.append("|------|----------|-----|--------|")
for i, call in enumerate(analysis["slow_calls"][:20], 1):
url = call["url"][:60] + "..." if len(call["url"]) > 60 else call["url"]
lines.append(
f"| {i} | {call['duration_sec']}s | {url} | {call['method']} |"
)
lines.append("")
if analysis["error_correlation"]:
lines.append("## Correlations with Errors")
lines.append("")
lines.append("The following slow requests occurred during error periods:")
lines.append("")
for corr in analysis["error_correlation"][:10]:
lines.append(f"- {corr['duration_sec']}s at {corr['timestamp']}")
lines.append(f" URL: {corr['url']}")
lines.append("")
return "\n".join(lines)
def generate_quick_reference(errors, key_parts, slow_analysis):
"""Generate a quick reference guide for external analysis."""
ref = {
"generated_at": datetime.now().isoformat(),
"session_analysis": {
"total_errors": len(errors),
"error_types": dict(Counter(e["error_type"] for e in errors)),
"sessions_with_errors": list(set(e["session_slug"] for e in errors)),
"key_part_ids": [p["part_id"] for p in key_parts],
},
"slow_requests": {
"total_slow_calls": len(slow_analysis.get("slow_calls", []))
if slow_analysis
else 0,
"slowest_call_duration_sec": slow_analysis["slow_calls"][0]["duration_sec"]
if slow_analysis and slow_analysis.get("slow_calls")
else 0,
},
"queries_for_analysis": [
{
"name": "Get all errors",
"jq": "cat errors.json | jq '.errors[] | {session_slug, timestamp, error_type, error_message}'",
},
{
"name": "Get errors by type",
"jq": "cat errors.json | jq '[.errors[] | select(.error_type == \"file_not_found\")]'",
},
{
"name": "Get slow API calls",
"jq": "cat requests.jsonl | jq 'select(.duration_ms > 30000) | {url, duration_ms}'",
},
{
"name": "Get key parts",
"jq": "cat key-parts.json | jq '.key_parts[] | {part_id, session_id, type, reason}'",
},
{
"name": "Find corrupted file paths",
"jq": "cat errors.json | jq '.errors[] | select(.input.filePath | startswith(\": \")) | .input.filePath' | sort -u",
},
],
}
return ref
def main():
if len(sys.argv) < 2:
print("Usage: python analyze_session.py <session_dir>")
print("")
print("Analyzes OpenCode session data and generates summary files:")
print(" - errors.json : Machine-readable error data")
print(" - errors-summary.md : Human-readable error summary")
print(" - key-parts.json : Key parts with error context")
print(" - slow-api-calls.md : Slow API call analysis")
print(" - quick-reference.json : Quick reference for analysis")
sys.exit(1)
session_dir = Path(sys.argv[1])
log_dir = session_dir.parent
print(f"Analyzing session: {session_dir}")
# Load data
data = load_session_data(session_dir)
if not data.get("parts"):
print("Error: No parts.json found")
sys.exit(1)
print(
f"Loaded {len(data['parts'])} parts, {len(data.get('sessions', []))} sessions"
)
# Extract errors
errors = extract_errors(data["parts"], data.get("sessions", []))
print(f"Found {len(errors)} errors")
# Generate error files
errors_json = generate_errors_json(errors)
with open(log_dir / "errors.json", "w") as f:
json.dump(errors_json, f, indent=2)
print(f" ✓ errors.json ({len(errors)} errors)")
errors_summary = generate_error_summary(errors, data.get("sessions", []))
with open(log_dir / "errors-summary.md", "w") as f:
f.write(errors_summary)
print(f" ✓ errors-summary.md")
# Generate key parts
key_parts = generate_key_parts_with_errors(data["parts"], errors)
with open(log_dir / "key-parts.json", "w") as f:
json.dump(
{
"generated_at": datetime.now().isoformat(),
"total_key_parts": len(key_parts),
"key_parts": key_parts,
},
f,
indent=2,
)
print(f" ✓ key-parts.json ({len(key_parts)} parts)")
# Analyze requests
requests_path = log_dir / "requests.jsonl"
slow_analysis = analyze_requests_jsonl(requests_path, errors)
if slow_analysis:
with open(log_dir / "slow-api-calls.json", "w") as f:
json.dump(
{
"generated_at": datetime.now().isoformat(),
"total_requests": slow_analysis["total_requests"],
"slow_calls_count": len(slow_analysis["slow_calls"]),
"slow_calls": slow_analysis["slow_calls"],
"error_correlations": slow_analysis["error_correlation"],
},
f,
indent=2,
)
print(
f" ✓ slow-api-calls.json ({len(slow_analysis['slow_calls'])} slow calls)"
)
slow_summary = generate_slow_api_summary(slow_analysis)
with open(log_dir / "slow-api-calls.md", "w") as f:
f.write(slow_summary)
print(f" ✓ slow-api-calls.md")
# Generate quick reference
quick_ref = generate_quick_reference(errors, key_parts, slow_analysis)
with open(log_dir / "quick-reference.json", "w") as f:
json.dump(quick_ref, f, indent=2)
print(f" ✓ quick-reference.json")
print(f"\n✓ Analysis complete. Files saved to: {log_dir}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Export OpenCode session data with parent/child linking and privacy redaction using Microsoft Presidio.
Captures primary session and all subagent sessions for comprehensive analysis.
Uses Presidio for robust PII and API key detection.
Auto-installs Presidio if not available using uv.
"""
import sqlite3
import json
import sys
import re
import os
import subprocess
from datetime import datetime
from pathlib import Path
# Check if we need to install presidio
PRESIDIO_AVAILABLE = False
try:
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
PRESIDIO_AVAILABLE = True
except ImportError:
pass
# If presidio not available and we're not in a venv, try to install it
if (
not PRESIDIO_AVAILABLE
and not hasattr(sys, "real_prefix")
and not (hasattr(sys, "base_prefix") and sys.base_prefix != sys.prefix)
):
print("Presidio not installed. Installing in isolated environment using uv...")
# Check if uv is available
uv_path = subprocess.run(
["which", "uv"], capture_output=True, text=True
).stdout.strip()
if uv_path:
# Create isolated environment and install presidio
venv_path = os.path.expanduser("~/.cache/opencode-export-venv")
try:
# Create venv if it doesn't exist
if not os.path.exists(venv_path):
subprocess.run(
[uv_path, "venv", venv_path], check=True, capture_output=True
)
print(f"✓ Created virtual environment at {venv_path}")
# Install presidio
subprocess.run(
[
uv_path,
"pip",
"install",
"--python",
f"{venv_path}/bin/python",
"presidio-analyzer",
"presidio-anonymizer",
],
check=True,
capture_output=True,
)
print("✓ Installed Presidio")
# Re-exec with the venv python
venv_python = f"{venv_path}/bin/python"
os.execv(venv_python, [venv_python] + sys.argv)
except Exception as e:
print(f"⚠ Could not install Presidio: {e}")
print(" Continuing with fallback regex patterns...")
else:
print(
"⚠ uv not found. Install with: curl -LsSf https://astral.sh/uv/install.sh | sh"
)
print(" Continuing with fallback regex patterns...")
# Now try importing again (either we have it or we're using fallback)
try:
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
PRESIDIO_AVAILABLE = True
except ImportError:
PRESIDIO_AVAILABLE = False
# Rest of the file continues as before...
REDACTED_TEXT = "[REDACTED]" # Changed from [TRUNCATED] to be clearer
IP_MASK_TEXT = "[IP-MASKED]"
# Comprehensive API key and secret patterns (fallback if Presidio not available)
FALLBACK_PRIVACY_PATTERNS = [
# OpenAI variants
r"sk-[a-zA-Z0-9]{20,}",
r"sk-proj-[a-zA-Z0-9\-]{20,}",
r"sk-ant-[a-zA-Z0-9\-]{32,}",
# GitHub tokens
r"gh[pousr]_[A-Za-z0-9_]{36,}",
# Stripe
r"sk_(live|test)_[A-Za-z0-9]{24,}",
r"pk_(live|test)_[A-Za-z0-9]{24,}",
# AWS
r"AKIA[0-9A-Z]{16}",
r"ASIA[0-9A-Z]{16}",
# Slack
r"xox[baprs]-[0-9]{10,13}-[0-9]{10,13}",
# SendGrid
r"SG\.[A-Za-z0-9_\-]{22}\.[A-Za-z0-9_\-]{43}",
# Google
r"AIza[0-9A-Za-z_\-]{35}",
# Bearer tokens
r"Bearer\s+[A-Za-z0-9_\-\.]{20,}",
# JWT patterns
r"eyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*",
]
class PrivacySanitizer:
"""Privacy sanitizer using Presidio or fallback regex"""
def __init__(self):
self.analyzer = None
self.anonymizer = None
if PRESIDIO_AVAILABLE:
try:
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
self._add_custom_recognizers()
except Exception as e:
print(f"Warning: Could not initialize Presidio: {e}")
self.analyzer = None
def _add_custom_recognizers(self):
"""Add custom recognizers for API keys and secrets"""
if not self.analyzer:
return
# GitHub tokens
github_pattern = Pattern(
name="github_token", regex=r"gh[pousr]_[A-Za-z0-9_]{36,}", score=0.95
)
github_recognizer = PatternRecognizer(
supported_entity="GITHUB_TOKEN", patterns=[github_pattern]
)
self.analyzer.registry.add_recognizer(github_recognizer)
# Generic sk- keys (OpenAI, Anthropic, etc.)
sk_pattern = Pattern(name="sk_key", regex=r"sk-[A-Za-z0-9_-]{20,}", score=0.9)
sk_recognizer = PatternRecognizer(
supported_entity="SECRET_KEY", patterns=[sk_pattern]
)
self.analyzer.registry.add_recognizer(sk_recognizer)
# AWS keys
aws_pattern = Pattern(
name="aws_key", regex=r"(AKIA|ASIA|AROA|AIDA)[0-9A-Z]{16}", score=0.95
)
aws_recognizer = PatternRecognizer(
supported_entity="AWS_KEY", patterns=[aws_pattern]
)
self.analyzer.registry.add_recognizer(aws_recognizer)
def mask_public_ips(self, text):
"""Mask public IP addresses while preserving private ranges"""
if not text:
return text
ip_pattern = r"\b(\d{1,3}\.){3}\d{1,3}\b"
def replace_ip(match):
ip_str = match.group(0)
try:
import ipaddress
ip = ipaddress.ip_address(ip_str)
if ip.is_private or ip.is_loopback or ip.is_link_local:
return ip_str
except:
pass
return IP_MASK_TEXT
return re.sub(ip_pattern, replace_ip, text)
def sanitize_with_regex(self, text):
"""Fallback regex-based sanitization"""
if not text:
return text
# Mask IPs first
text = self.mask_public_ips(text)
# Apply privacy patterns
for pattern in FALLBACK_PRIVACY_PATTERNS:
try:
text = re.sub(pattern, REDACTED_TEXT, text, flags=re.IGNORECASE)
except Exception:
pass
return text
def sanitize(self, text):
"""Main sanitization method - uses Presidio if available, else regex"""
if not text:
return text
if self.analyzer and PRESIDIO_AVAILABLE:
try:
results = self.analyzer.analyze(text=text, language="en")
anonymized = self.anonymizer.anonymize(
text=text, analyzer_results=results
)
return anonymized.text
except:
return self.sanitize_with_regex(text)
else:
return self.sanitize_with_regex(text)
# Global sanitizer instance
_sanitizer = None
def get_sanitizer():
"""Get or create sanitizer instance"""
global _sanitizer
if _sanitizer is None:
_sanitizer = PrivacySanitizer()
return _sanitizer
def sanitize_message_data(data):
"""Sanitize message data recursively - preserve file paths"""
sanitizer = get_sanitizer()
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
# Skip certain fields that don't need sanitization
if key in ["role", "type", "id", "name", "model", "timestamp"]:
sanitized[key] = value
# PRESERVE file paths - don't sanitize
elif key in ["filePath", "path", "filepath", "cwd", "root", "directory"]:
sanitized[key] = value
elif key in [
"content",
"text",
"message",
"input",
"arguments",
"system",
"prompt",
]:
sanitized[key] = sanitizer.sanitize(value)
elif isinstance(value, (dict, list)):
sanitized[key] = sanitize_message_data(value)
else:
sanitized[key] = value
return sanitized
elif isinstance(data, list):
return [sanitize_message_data(item) for item in data]
elif isinstance(data, str):
return sanitizer.sanitize(data)
else:
return data
def get_session_tree(db_path, root_session_id):
"""Get all sessions in the tree (root + all children recursively)"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get root session
cursor.execute(
"""
SELECT id, parent_id, slug, title, directory,
datetime(time_created/1000, 'unixepoch') as created,
datetime(time_updated/1000, 'unixepoch') as updated,
version, permission
FROM session
WHERE id = ? AND time_archived IS NULL
""",
(root_session_id,),
)
root = cursor.fetchone()
if not root:
print(f"Error: Session {root_session_id} not found")
conn.close()
return None, []
# Get all child sessions recursively
all_session_ids = [root_session_id]
sessions = [dict(root)]
def get_children(parent_id):
cursor.execute(
"""
SELECT id, parent_id, slug, title, directory,
datetime(time_created/1000, 'unixepoch') as created,
datetime(time_updated/1000, 'unixepoch') as updated,
version, permission
FROM session
WHERE parent_id = ? AND time_archived IS NULL
ORDER BY time_created
""",
(parent_id,),
)
children = cursor.fetchall()
for child in children:
child_dict = dict(child)
sessions.append(child_dict)
all_session_ids.append(child_dict["id"])
get_children(child_dict["id"])
get_children(root_session_id)
conn.close()
return sessions, all_session_ids
def get_messages_for_sessions(db_path, session_ids):
"""Get all messages for the given sessions"""
if not session_ids:
return []
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
placeholders = ",".join("?" * len(session_ids))
cursor.execute(
f"""
SELECT id, session_id, data,
datetime(time_created/1000, 'unixepoch') as timestamp
FROM message
WHERE session_id IN ({placeholders})
ORDER BY session_id, id
""",
tuple(session_ids),
)
messages = []
for row in cursor.fetchall():
msg = dict(row)
try:
msg_data = json.loads(msg["data"])
msg["data"] = sanitize_message_data(msg_data)
except:
msg["data"] = {"raw": sanitize_message_data(msg["data"])}
messages.append(msg)
conn.close()
return messages
def get_parts_for_sessions(db_path, session_ids):
"""Get all parts for the given sessions"""
if not session_ids:
return []
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
placeholders = ",".join("?" * len(session_ids))
cursor.execute(
f"""
SELECT id, session_id, data,
datetime(time_created/1000, 'unixepoch') as timestamp
FROM part
WHERE session_id IN ({placeholders})
ORDER BY session_id, id
""",
tuple(session_ids),
)
parts = []
for row in cursor.fetchall():
part = dict(row)
try:
part_data = json.loads(part["data"])
part["data"] = sanitize_message_data(part_data)
except:
part["data"] = {"raw": sanitize_message_data(part["data"])}
parts.append(part)
conn.close()
return parts
def export_session_data(output_dir, session_id=None):
"""Export complete session data with privacy redaction"""
db_path = os.path.expanduser("~/.local/share/opencode/opencode.db")
if not os.path.exists(db_path):
print(f"Error: Database not found at {db_path}")
return False
sanitizer = get_sanitizer()
if sanitizer.analyzer:
print("✓ Using Microsoft Presidio for privacy detection")
else:
print("⚠ Using fallback regex patterns")
if not session_id:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM session
WHERE time_archived IS NULL AND parent_id IS NULL
ORDER BY time_created DESC LIMIT 1
""")
result = cursor.fetchone()
conn.close()
if result:
session_id = result[0]
print(f"Using most recent session: {session_id}")
else:
print("Error: No active sessions found")
return False
sessions, session_ids = get_session_tree(db_path, session_id)
if not sessions:
return False
print(f"Found {len(sessions)} sessions (1 parent + {len(sessions) - 1} children)")
os.makedirs(output_dir, exist_ok=True)
sessions_file = os.path.join(output_dir, "sessions.json")
with open(sessions_file, "w") as f:
json.dump(sessions, f, indent=2)
print(f"✓ Exported {len(sessions)} sessions")
print("Exporting messages...")
messages = get_messages_for_sessions(db_path, session_ids)
messages_file = os.path.join(output_dir, "messages.json")
with open(messages_file, "w") as f:
json.dump(messages, f, indent=2)
print(f"✓ Exported {len(messages)} messages")
print("Exporting parts...")
parts = get_parts_for_sessions(db_path, session_ids)
parts_file = os.path.join(output_dir, "parts.json")
with open(parts_file, "w") as f:
json.dump(parts, f, indent=2)
print(f"✓ Exported {len(parts)} parts")
summary = {
"exported_at": datetime.now().isoformat(),
"root_session_id": session_id,
"total_sessions": len(sessions),
"total_messages": len(messages),
"total_parts": len(parts),
"sanitization": {
"engine": "presidio" if sanitizer.analyzer else "regex_fallback",
},
"sessions": [
{
"id": s["id"],
"slug": s["slug"],
"title": s["title"],
"parent_id": s["parent_id"],
"created": s["created"],
}
for s in sessions
],
}
summary_file = os.path.join(output_dir, "summary.json")
with open(summary_file, "w") as f:
json.dump(summary, f, indent=2)
print(f"✓ Created summary.json")
return True
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python export_sessions.py <output_dir> [session_id]")
sys.exit(1)
output_dir = sys.argv[1]
session_id = sys.argv[2] if len(sys.argv) > 2 else None
success = export_session_data(output_dir, session_id)
sys.exit(0 if success else 1)
#!/bin/zsh
# proxy-opencode-debug.zsh - Enhanced OpenCode debugging with mitmproxy
#
# A comprehensive debugging tool for OpenCode that:
# - Captures all API traffic including full request/response bodies
# - Supports isolated configurations for testing
# - Automatically sanitizes and uploads logs to secret gists
# - Auto-installs mitmproxy if not present
#
# Installation:
# 1. Copy this file to ~/.oh-my-zsh/custom/
# 2. Or source it in your .zshrc: source /path/to/proxy-opencode-debug.zsh
#
# Quick Start:
# proxy_opencode # Run with normal config + proxy
# proxy_opencode --isolated # Run with fresh isolated config
# proxy_opencode --upload # Capture and upload logs to gist
#
# Documentation: Run `proxy_opencode --help` for full usage
#
# Author: Created for OpenCode debugging
# Version: 1.2.0
# ============================================================================
# PRIVACY PATTERNS - Hardcoded sensitive data patterns to truncate
# ============================================================================
# These patterns are automatically replaced with [TRUNCATED] in logs
# Header names are case-insensitive
# Headers to completely redact (value replaced with [TRUNCATED])
PROXY_PRIVACY_HEADERS=(
"authorization"
"x-api-key"
"api-key"
"apikey"
"x-auth-token"
"auth-token"
"x-access-token"
"access-token"
"x-token"
"token"
"cookie"
"x-csrf-token"
"x-xsrf-token"
"x-request-signature"
"x-signature"
"x-secret"
"x-api-secret"
"api-secret"
"x-auth-key"
"auth-key"
"x-password"
"x-api-password"
"x-session-id"
"session-id"
"x-user-token"
"user-token"
"x-bearer-token"
"bearer-token"
"x-refresh-token"
"refresh-token"
"x-id-token"
"id-token"
"x-client-secret"
"client-secret"
"x-app-secret"
"app-secret"
"x-private-key"
"private-key"
"x-fingerprint"
"x-device-id"
"x-otp"
"x-verification-code"
"x-2fa-code"
"x-mfa-code"
)
# Body patterns to redact (regex patterns for JSON/form data)
PROXY_PRIVACY_BODY_PATTERNS=(
# API Keys (various formats)
'"api[_-]?key"\s*:\s*"[^"]*"'
'"apikey"\s*:\s*"[^"]*"'
'"x-api-key"\s*:\s*"[^"]*"'
# Tokens
'"token"\s*:\s*"[^"]*"'
'"auth[_-]?token"\s*:\s*"[^"]*"'
'"access[_-]?token"\s*:\s*"[^"]*"'
'"refresh[_-]?token"\s*:\s*"[^"]*"'
'"id[_-]?token"\s*:\s*"[^"]*"'
'"bearer"\s*:\s*"[^"]*"'
'"jwt"\s*:\s*"[^"]*"'
# Secrets
'"secret"\s*:\s*"[^"]*"'
'"client[_-]?secret"\s*:\s*"[^"]*"'
'"app[_-]?secret"\s*:\s*"[^"]*"'
'"api[_-]?secret"\s*:\s*"[^"]*"'
# Passwords
'"password"\s*:\s*"[^"]*"'
'"passwd"\s*:\s*"[^"]*"'
'"pwd"\s*:\s*"[^"]*"'
# Keys
'"private[_-]?key"\s*:\s*"[^"]*"'
'"api[_-]?key"\s*:\s*"[^"]*"'
'"key[_-]?id"\s*:\s*"[^"]*"'
# Session/Auth
'"session[_-]?id"\s*:\s*"[^"]*"'
'"sessionid"\s*:\s*"[^"]*"'
'"auth[_-]?code"\s*:\s*"[^"]*"'
'"verification[_-]?code"\s*:\s*"[^"]*"'
'"otp"\s*:\s*"[^"]*"'
'"mfa[_-]?code"\s*:\s*"[^"]*"'
'"2fa[_-]?code"\s*:\s*"[^"]*"'
# Signatures
'"signature"\s*:\s*"[^"]*"'
'"hmac"\s*:\s*"[^"]*"'
'"checksum"\s*:\s*"[^"]*"'
# OpenAI/AI specific
'"sk-[a-zA-Z0-9]{20,}"' # OpenAI API keys
'"org-[a-zA-Z0-9]{10,}"' # OpenAI org IDs
'"project-[a-zA-Z0-9]{10,}"' # OpenAI project IDs
# AWS/GCP/Azure patterns
'"AKIA[0-9A-Z]{16}"' # AWS Access Key ID
'"[0-9a-zA-Z/+]{40}"' # AWS Secret Access Key (40 chars base64-like)
'"ya29\\.[0-9a-zA-Z_-]+"' # Google OAuth tokens
# Form data patterns
'api[_-]?key=[^&\s]+'
'apikey=[^&\s]+'
'token=[^&\s]+'
'auth[_-]?token=[^&\s]+'
'access[_-]?token=[^&\s]+'
'password=[^&\s]+'
'secret=[^&\s]+'
'client[_-]?secret=[^&\s]+'
)
# String patterns anywhere in content (these get replaced globally)
PROXY_PRIVACY_STRINGS=(
"sk-[a-zA-Z0-9]{20,}" # OpenAI API keys
"sk-[A-Za-z0-9_-]{20,}" # Other API keys
"[0-9a-f]{64,}" # Hex hashes (private keys, etc)
"eyJ[a-zA-Z0-9_-]*\\.[a-zA-Z0-9_-]*\\.[a-zA-Z0-9_-]*" # JWT tokens
"[A-Za-z0-9_\\-]{8,}\\.[A-Za-z0-9_\\-]{8,}\\.[A-Za-z0-9_\\-]{8,}" # Other token formats
)
# IP Address patterns to mask (public IPs only)
PROXY_PRIVACY_IP_PATTERNS=(
'\\b(?!(?:10\\.|172\\.(?:1[6-9]|2[0-9]|3[01])\\.|192\\.168\\.|127\\.))(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b'
)
# ============================================================================
# Configuration
# ============================================================================
OPENCODE_DEBUG_DIR="${OPENCODE_DEBUG_DIR:-$HOME/.local/share/opencode}"
OPENCODE_ISOLATED_CONFIGS_DIR="${OPENCODE_ISOLATED_CONFIGS_DIR:-$HOME/.config/opencode/isolated-configs}"
OPENCODE_DEBUG_LOGS_DIR="${OPENCODE_DEBUG_LOGS_DIR:-/tmp/opencode-debug-logs}"
MITMPROXY_PORT="${MITMPROXY_PORT:-58888}"
MITMPROXY_WEB_PORT="${MITMPROXY_WEB_PORT:-8081}"
mkdir -p "$OPENCODE_ISOLATED_CONFIGS_DIR" "$OPENCODE_DEBUG_LOGS_DIR"
# ============================================================================
# Helper Functions
# ============================================================================
_proxy_opencode_check_mitmproxy() {
if ! command -v mitmweb &> /dev/null; then
echo "mitmproxy not found. Installing..."
if command -v brew &> /dev/null; then
echo "Installing via Homebrew..."
brew install mitmproxy
elif command -v pip3 &> /dev/null; then
echo "Installing via pip3..."
pip3 install --user mitmproxy
elif command -v pip &> /dev/null; then
echo "Installing via pip..."
pip install --user mitmproxy
elif command -v uv &> /dev/null; then
echo "Installing via uv..."
uv pip install mitmproxy
else
echo ""
echo "Error: Could not install mitmproxy automatically."
echo ""
echo "Please install manually:"
echo " brew install mitmproxy # macOS"
echo " pip3 install --user mitmproxy # Python"
echo ""
echo "For other platforms: https://mitmproxy.org/"
return 1
fi
if ! command -v mitmweb &> /dev/null; then
if [[ -f "$HOME/.local/bin/mitmweb" ]]; then
export PATH="$HOME/.local/bin:$PATH"
elif [[ -f "$HOME/Library/Python/3.*/bin/mitmweb" ]]; then
for p in $HOME/Library/Python/3.*/bin; do
if [[ -f "$p/mitmweb" ]]; then
export PATH="$p:$PATH"
break
fi
done
fi
if ! command -v mitmweb &> /dev/null; then
echo "Error: mitmproxy installed but mitmweb not in PATH"
echo "You may need to restart your shell"
return 1
fi
fi
echo "✓ mitmproxy installed successfully"
fi
return 0
}
proxy_opencode_create_config() {
local config_name="${1:-isolated-$(date +%Y%m%d-%H%M%S)}"
local config_dir="$OPENCODE_ISOLATED_CONFIGS_DIR/$config_name"
local config_file="$config_dir/opencode.jsonc"
if [[ -d "$config_dir" ]]; then
echo "Config '$config_name' already exists"
echo "To edit: proxy_opencode_edit_config $config_name"
return 1
fi
mkdir -p "$config_dir"
cat > "$config_file" << 'EOF'
{
"$schema": "https://opencode.ai/config.json",
"model": "kimi-for-coding/k2p5",
"permission": {
"bash": {
"*": "allow"
}
}
}
EOF
echo "Created isolated config: $config_name"
echo " Location: $config_file"
echo ""
echo "Edit: proxy_opencode_edit_config $config_name"
echo "Use: proxy_opencode --config $config_name"
echo -n "\nEdit now? (y/N) "
read -q edit_now
echo ""
if [[ "$edit_now" == "y" || "$edit_now" == "Y" ]]; then
proxy_opencode_edit_config "$config_name"
fi
}
proxy_opencode_edit_config() {
local config_name="${1:-}"
if [[ -z "$config_name" ]]; then
echo "Usage: proxy_opencode_edit_config <name>"
proxy_opencode_list_configs
return 1
fi
local config_file="$OPENCODE_ISOLATED_CONFIGS_DIR/$config_name/opencode.jsonc"
if [[ ! -f "$config_file" ]]; then
echo "Config not found: $config_name"
proxy_opencode_list_configs
return 1
fi
${EDITOR:-${VISUAL:-vim}} "$config_file"
}
proxy_opencode_list_configs() {
if [[ ! -d "$OPENCODE_ISOLATED_CONFIGS_DIR" ]] || \
[[ -z "$(ls -A "$OPENCODE_ISOLATED_CONFIGS_DIR" 2>/dev/null)" ]]; then
echo "No isolated configs found"
echo "Create: proxy_opencode_create_config [name]"
return 0
fi
echo "Available isolated configs:"
for dir in "$OPENCODE_ISOLATED_CONFIGS_DIR"/*(/); do
if [[ -d "$dir" ]]; then
local name=$(basename "$dir")
local config_file="$dir/opencode.jsonc"
if [[ -f "$config_file" ]]; then
local modified
if [[ "$OSTYPE" == "darwin"* ]]; then
modified=$(stat -f "%Sm" -t "%Y-%m-%d %H:%M" "$config_file")
else
modified=$(stat -c "%y" "$config_file" | cut -d' ' -f1,2 | cut -d'.' -f1)
fi
printf " %-20s (modified: %s)\n" "$name" "$modified"
fi
fi
done
}
proxy_opencode_delete_config() {
local config_name="$1"
if [[ -z "$config_name" ]]; then
echo "Usage: proxy_opencode_delete_config <name>"
proxy_opencode_list_configs
return 1
fi
local config_dir="$OPENCODE_ISOLATED_CONFIGS_DIR/$config_name"
if [[ ! -d "$config_dir" ]]; then
echo "Config not found: $config_name"
return 1
fi
echo -n "Delete '$config_name'? (y/N) "
read -q confirm
echo ""
if [[ "$confirm" == "y" || "$confirm" == "Y" ]]; then
rm -rf "$config_dir"
echo "Deleted: $config_name"
else
echo "Cancelled"
fi
}
# ============================================================================
# Main Function
# ============================================================================
proxy_opencode() {
if ! _proxy_opencode_check_mitmproxy; then
return 1
fi
local use_isolated=false
local config_name=""
local capture_logs=true
local upload_gist=false
local edit_config_first=false
local opencode_debug=true
local session_name="$(date +%Y%m%d-%H%M%S)"
local original_config=""
while [[ $# -gt 0 ]]; do
case $1 in
--isolated|-i)
use_isolated=true
shift
;;
--config|-c)
config_name="$2"
shift 2
;;
--no-logs)
capture_logs=false
shift
;;
--upload|-u)
upload_gist=true
shift
;;
--edit|-e)
edit_config_first=true
shift
;;
--debug|-d)
opencode_debug=true
shift
;;
--no-debug)
opencode_debug=false
shift
;;
--help|-h)
_proxy_opencode_show_help
return 0
;;
--)
shift
break
;;
-*)
echo "Unknown option: $1"
echo "Run 'proxy_opencode --help' for usage"
return 1
;;
*)
break
;;
esac
done
# Setup log files
local log_dir=""
local mitm_log=""
local mitm_full_log=""
local opencode_log=""
if [[ "$capture_logs" == true ]]; then
log_dir="$OPENCODE_DEBUG_LOGS_DIR/$session_name"
mkdir -p "$log_dir"
mitm_log="$log_dir/mitm.log"
mitm_full_log="$log_dir/requests.jsonl"
opencode_log="$log_dir/opencode.log"
fi
# Handle config selection
local config_path=""
local tmpdir=""
if [[ -n "$config_name" ]]; then
config_path="$OPENCODE_ISOLATED_CONFIGS_DIR/$config_name/opencode.jsonc"
original_config="$config_path"
if [[ ! -f "$config_path" ]]; then
echo "Config not found: $config_name"
proxy_opencode_list_configs
return 1
fi
if [[ "$edit_config_first" == true ]]; then
proxy_opencode_edit_config "$config_name"
fi
echo "Using isolated config: $config_name"
echo " $config_path"
echo ""
elif [[ "$use_isolated" == true ]]; then
tmpdir=$(mktemp -d)
mkdir -p "$tmpdir/.config/opencode"
config_path="$tmpdir/.config/opencode/opencode.jsonc"
original_config="$config_path"
cat > "$config_path" << 'EOF'
{
"$schema": "https://opencode.ai/config.json",
"model": "kimi-for-coding/k2p5",
"permission": {
"bash": {
"*": "allow"
}
}
}
EOF
export HOME="$tmpdir"
echo "Running in TEMPORARY isolated mode"
echo " Config: $config_path"
echo ""
else
original_config="$HOME/.config/opencode/opencode.jsonc"
fi
# Generate privacy script headers list
local privacy_headers_str=""
for h in "${PROXY_PRIVACY_HEADERS[@]}"; do
privacy_headers_str="$privacy_headers_str'$h', "
done
# Generate privacy patterns list
local privacy_patterns_str=""
for p in "${PROXY_PRIVACY_BODY_PATTERNS[@]}"; do
privacy_patterns_str="$privacy_patterns_str$r'$p', "
done
# Generate string patterns list
local privacy_strings_str=""
for s in "${PROXY_PRIVACY_STRINGS[@]}"; do
privacy_strings_str="$privacy_strings_str$r'$s', "
done
# Start mitmweb with full capture script
echo "Starting mitmweb proxy on port $MITMPROXY_PORT..."
echo " ✓ Full request/response capture enabled"
echo " ✓ Auto-redacting sensitive headers (${#PROXY_PRIVACY_HEADERS[@]} patterns)"
local mitm_script="$log_dir/capture_script.py"
cat > "$mitm_script" << PYTHON_EOF
from mitmproxy import http
import json
import datetime
import re
import os
import ipaddress
# Headers to redact (case-insensitive)
PRIVACY_HEADERS = [${privacy_headers_str%, }]
# Body patterns to redact (regex)
PRIVACY_PATTERNS = [${privacy_patterns_str%, }]
# String patterns to redact globally
PRIVACY_STRINGS = [${privacy_strings_str%, }]
REDACTED_TEXT = "[TRUNCATED]"
IP_MASK_TEXT = "[IP-MASKED]"
def is_private_ip(ip_str):
"""Check if IP is private/local (not public)"""
try:
ip = ipaddress.ip_address(ip_str)
return ip.is_private or ip.is_loopback or ip.is_link_local
except:
return False
def mask_public_ips(text):
"""Mask public IP addresses while preserving private ranges"""
if not text:
return text
ip_pattern = r'\b(\d{1,3}\.){3}\d{1,3}\b'
def replace_ip(match):
ip_str = match.group(0)
if is_private_ip(ip_str):
return ip_str
return IP_MASK_TEXT
return re.sub(ip_pattern, replace_ip, text)
def sanitize_headers(headers):
"""Redact sensitive headers"""
result = {}
for key, value in headers.items():
key_lower = key.lower()
if key_lower in [h.lower() for h in PRIVACY_HEADERS]:
result[key] = REDACTED_TEXT
else:
sanitized_value = value
if re.search(r'Bearer\s+\S+', value, re.IGNORECASE):
sanitized_value = re.sub(r'(Bearer\s+)\S+', r'\1' + REDACTED_TEXT, value, flags=re.IGNORECASE)
if re.search(r'sk-[a-zA-Z0-9]+', value):
sanitized_value = re.sub(r'sk-[a-zA-Z0-9]+', REDACTED_TEXT, sanitized_value)
sanitized_value = mask_public_ips(sanitized_value)
result[key] = sanitized_value
return result
def sanitize_body(content):
"""Redact sensitive patterns in body - no truncation, full preservation"""
if not content:
return content
try:
text = content.decode('utf-8', errors='replace')
text = mask_public_ips(text)
for pattern in PRIVACY_PATTERNS:
try:
text = re.sub(pattern, lambda m: m.group(0).split(':')[0] + ': "' + REDACTED_TEXT + '"', text, flags=re.IGNORECASE)
except:
pass
for pattern in PRIVACY_STRINGS:
try:
text = re.sub(pattern, REDACTED_TEXT, text, flags=re.IGNORECASE)
except:
pass
return text
except:
return content.decode('utf-8', errors='replace') if content else ""
class PrivacyLogger:
def __init__(self):
self.log_path = os.environ.get('_MITM_LOG_PATH', '/tmp/requests.jsonl')
self.log_file = open(self.log_path, "a") # Append mode - never overwrite
self.request_store = {} # Store request data until response arrives
self.request_count = 0
def request(self, flow: http.HTTPFlow) -> None:
self.request_count += 1
timestamp = datetime.datetime.now()
url = mask_public_ips(flow.request.url)
# Prepare request data with timing
request_data = {
"id": self.request_count,
"timestamp": timestamp.isoformat(),
"timestamp_epoch_ms": int(timestamp.timestamp() * 1000),
"method": flow.request.method,
"url": url,
"headers": sanitize_headers(dict(flow.request.headers)),
"body": "",
"request_start_time": timestamp.isoformat(),
"request_start_epoch_ms": int(timestamp.timestamp() * 1000)
}
# Capture full body (no truncation)
if flow.request.content:
request_data["body"] = sanitize_body(flow.request.content)
# Store for pairing with response
self.request_store[self.request_count] = request_data
def response(self, flow: http.HTTPFlow) -> None:
response_time = datetime.datetime.now()
req_id = self.request_count
# Get stored request data
request_data = self.request_store.get(req_id, {
"id": req_id,
"timestamp": response_time.isoformat(),
"timestamp_epoch_ms": int(response_time.timestamp() * 1000),
"method": "UNKNOWN",
"url": "",
"headers": {},
"body": "",
"request_start_time": response_time.isoformat(),
"request_start_epoch_ms": int(response_time.timestamp() * 1000)
})
# Calculate duration
request_start_ms = request_data.get("request_start_epoch_ms", int(response_time.timestamp() * 1000))
response_epoch_ms = int(response_time.timestamp() * 1000)
duration_ms = response_epoch_ms - request_start_ms
# Prepare response data
response_data = {
"status": flow.response.status_code,
"headers": sanitize_headers(dict(flow.response.headers)),
"body": "",
"timestamp": response_time.isoformat(),
"timestamp_epoch_ms": response_epoch_ms,
"duration_ms": duration_ms
}
# Capture full body (no truncation)
if flow.response.content:
response_data["body"] = sanitize_body(flow.response.content)
# Build complete record with timing
record = {
"id": req_id,
"timestamp": request_data["timestamp"],
"timestamp_epoch_ms": request_data["timestamp_epoch_ms"],
"method": request_data["method"],
"url": request_data["url"],
"request_headers": request_data["headers"],
"request_body": request_data["body"],
"request_start_time": request_data["request_start_time"],
"request_start_epoch_ms": request_data["request_start_epoch_ms"],
"response_status": response_data["status"],
"response_headers": response_data["headers"],
"response_body": response_data["body"],
"response_time": response_data["timestamp"],
"response_epoch_ms": response_data["timestamp_epoch_ms"],
"duration_ms": response_data["duration_ms"]
}
# Write as JSON line (atomic append)
self.log_file.write(json.dumps(record, ensure_ascii=False) + "\n")
self.log_file.flush()
# Clean up stored request
if req_id in self.request_store:
del self.request_store[req_id]
addons = [PrivacyLogger()]
PYTHON_EOF
# Write transform_log.py to session directory
local transform_script="$log_dir/transform_log.py"
cat > "$transform_script" << 'TRANSFORM_EOF'
#!/usr/bin/env python3
"""
Transform mitmproxy JSONL logs into human-readable conversation format.
Aggregates streaming SSE deltas into complete messages.
"""
import json
import sys
import re
from datetime import datetime
from pathlib import Path
def parse_sse_stream(body_text):
"""Parse Server-Sent Events stream and aggregate deltas into complete messages."""
messages = []
current_content = ""
current_tool_calls = []
usage_info = None
model_info = None
# Split by event boundaries
events = re.split(r'\n\n+', body_text)
for event in events:
if not event.strip():
continue
# Extract data lines
data_lines = []
for line in event.split('\n'):
if line.startswith('data: '):
data_lines.append(line[6:]) # Remove 'data: ' prefix
if not data_lines:
continue
# Join data lines and parse JSON
data_text = '\n'.join(data_lines)
# Handle [DONE] marker
if data_text.strip() == '[DONE]':
break
try:
data = json.loads(data_text)
except json.JSONDecodeError:
continue
# Extract model info from first event
if 'model' in data:
model_info = data.get('model')
# Extract usage info
if 'usage' in data:
usage_info = data['usage']
# Handle different event types
if 'type' in data:
event_type = data['type']
if event_type == 'content_block_delta':
delta = data.get('delta', {})
if 'text' in delta:
current_content += delta['text']
elif event_type == 'content_block_start':
block = data.get('content_block', {})
if block.get('type') == 'tool_use':
current_tool_calls.append({
'id': block.get('id', ''),
'name': block.get('name', ''),
'input': block.get('input', {})
})
elif event_type == 'message_delta':
# Message complete
delta = data.get('delta', {})
stop_reason = delta.get('stop_reason', '')
# Handle Anthropic format
elif 'delta' in data:
delta = data['delta']
if 'text' in delta:
current_content += delta['text']
if 'partial_json' in delta:
current_content += delta['partial_json']
# Handle OpenAI format
elif 'choices' in data:
for choice in data['choices']:
delta = choice.get('delta', {})
if 'content' in delta and delta['content']:
current_content += delta['content']
if 'tool_calls' in delta:
for tc in delta['tool_calls']:
current_tool_calls.append(tc)
return {
'content': current_content,
'tool_calls': current_tool_calls,
'usage': usage_info,
'model': model_info
}
def extract_system_prompt(request_body):
"""Extract system prompt from request body."""
try:
data = json.loads(request_body)
# Anthropic format
if 'system' in data:
system = data['system']
if isinstance(system, list):
return '\n'.join([s.get('text', '') for s in system])
elif isinstance(system, str):
return system
# OpenAI format
if 'messages' in data:
for msg in data['messages']:
if msg.get('role') == 'system':
return msg.get('content', '')
return None
except:
return None
def extract_user_messages(request_body):
"""Extract user messages from request body."""
try:
data = json.loads(request_body)
messages = []
# Anthropic format
if 'messages' in data:
for msg in data['messages']:
if msg.get('role') == 'user':
content = msg.get('content', '')
if isinstance(content, list):
# Handle content blocks
texts = []
for block in content:
if block.get('type') == 'text':
texts.append(block.get('text', ''))
elif block.get('type') == 'image':
texts.append('[IMAGE]')
messages.append('\n'.join(texts))
else:
messages.append(content)
return messages
except:
return []
def extract_model_info(request_body):
"""Extract model and parameters from request body."""
try:
data = json.loads(request_body)
return {
'model': data.get('model', 'unknown'),
'max_tokens': data.get('max_tokens'),
'temperature': data.get('temperature'),
'thinking': data.get('thinking')
}
except:
return {'model': 'unknown'}
def format_conversation(record, index):
"""Format a single request/response pair as readable conversation."""
output = []
output.append(f"\n{'='*80}")
output.append(f"=== Request #{index}: {record['method']} {record['url'][:80]} ===")
output.append(f"{'='*80}\n")
output.append(f"Time: {record['timestamp']}")
output.append("")
# Request headers
auth_headers = {k: v for k, v in record['request_headers'].items()
if k.lower() in ['authorization', 'x-api-key']}
if auth_headers:
output.append("🔐 Authentication Headers:")
for k, v in auth_headers.items():
output.append(f" {k}: {v}")
output.append("")
# Request body analysis
req_body = record.get('request_body', '')
if req_body and len(req_body) > 100:
model_info = extract_model_info(req_body)
output.append(f"🤖 Model: {model_info['model']}")
if model_info.get('max_tokens'):
output.append(f" Max tokens: {model_info['max_tokens']}")
if model_info.get('thinking'):
output.append(f" Thinking: {model_info['thinking']}")
output.append("")
# System prompt
system = extract_system_prompt(req_body)
if system:
output.append("📋 System Prompt:")
output.append("-" * 60)
# Truncate very long system prompts for readability
if len(system) > 2000:
output.append(system[:2000])
output.append(f"... [{len(system) - 2000} more characters]")
else:
output.append(system)
output.append("-" * 60)
output.append("")
# User messages
user_msgs = extract_user_messages(req_body)
if user_msgs:
output.append("👤 User Messages:")
for i, msg in enumerate(user_msgs, 1):
output.append(f" [{i}] {msg[:500]}{'...' if len(msg) > 500 else ''}")
output.append("")
# Response
output.append(f"\n{'─' * 80}")
output.append(f"--- Response #{index}: {record['response_status']} ---")
output.append(f"{'─' * 80}\n")
resp_body = record.get('response_body', '')
if resp_body:
# Check if it's an SSE stream
if 'data:' in resp_body[:100] or 'event:' in resp_body[:100]:
result = parse_sse_stream(resp_body)
if result['model']:
output.append(f"Model: {result['model']}")
if result['content']:
output.append("\n📝 Assistant Response:")
output.append("-" * 60)
output.append(result['content'])
output.append("-" * 60)
if result['tool_calls']:
output.append("\n🔧 Tool Calls:")
for tc in result['tool_calls']:
name = tc.get('name', tc.get('function', {}).get('name', 'unknown'))
tool_id = tc.get('id', 'N/A')
output.append(f" • {name} (ID: {tool_id})")
if 'input' in tc:
input_json = json.dumps(tc['input'], indent=4)
for line in input_json.split('\n'):
output.append(f" {line}")
elif 'arguments' in tc:
args = tc['arguments']
if isinstance(args, str):
try:
args = json.loads(args)
except:
pass
output.append(f" Arguments: {args}")
if result['usage']:
usage = result['usage']
output.append("\n📊 Usage:")
if 'input_tokens' in usage:
output.append(f" Input tokens: {usage['input_tokens']}")
if 'output_tokens' in usage:
output.append(f" Output tokens: {usage['output_tokens']}")
if 'total_tokens' in usage:
output.append(f" Total tokens: {usage['total_tokens']}")
else:
# Regular JSON response
try:
resp_json = json.loads(resp_body)
output.append("Response Body:")
output.append("-" * 60)
output.append(json.dumps(resp_json, indent=2))
output.append("-" * 60)
except:
# Plain text
output.append("Response Body:")
output.append("-" * 60)
output.append(resp_body[:2000])
if len(resp_body) > 2000:
output.append(f"... [{len(resp_body) - 2000} more characters]")
output.append("-" * 60)
output.append("")
return '\n'.join(output)
def transform_log(input_path, output_path):
"""Transform JSONL log file to readable format."""
input_file = Path(input_path)
output_file = Path(output_path)
if not input_file.exists():
print(f"Error: Input file not found: {input_path}")
return False
records = []
errors = 0
# Read JSONL file
with open(input_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
records.append(record)
except json.JSONDecodeError as e:
errors += 1
print(f"Warning: Could not parse line {line_num}: {e}")
if errors:
print(f"Warning: {errors} lines could not be parsed")
# Sort by ID to ensure correct order
records.sort(key=lambda x: x.get('id', 0))
# Generate output
output_lines = []
output_lines.append("MITM Proxy Log - Human Readable Format")
output_lines.append("=" * 80)
output_lines.append(f"Source: {input_path}")
output_lines.append(f"Generated: {datetime.now().isoformat()}")
output_lines.append(f"Records: {len(records)}")
output_lines.append("=" * 80)
for i, record in enumerate(records, 1):
try:
formatted = format_conversation(record, i)
output_lines.append(formatted)
except Exception as e:
output_lines.append(f"\nError formatting record {i}: {e}\n")
# Write output
output_file.write_text('\n'.join(output_lines), encoding='utf-8')
print(f"✓ Transformed {len(records)} records to: {output_path}")
return True
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python transform_log.py <input.jsonl> <output.log>")
print("")
print("Transforms JSONL logs from mitmproxy into human-readable format.")
print("Aggregates streaming SSE responses into complete messages.")
sys.exit(1)
input_path = sys.argv[1]
output_path = sys.argv[2]
success = transform_log(input_path, output_path)
sys.exit(0 if success else 1)
TRANSFORM_EOF
# Write export_sessions.py to session directory
local export_script="$log_dir/export_sessions.py"
cp "$HOME/.oh-my-zsh/custom/export_sessions.py" "$export_script"
if [[ "$capture_logs" == true ]]; then
_MITM_LOG_PATH="$mitm_full_log" mitmweb \
--listen-host 127.0.0.1 --listen-port "$MITMPROXY_PORT" \
--web-open-browser \
-s "$mitm_script" &> "$mitm_log" &
local mitmweb_pid=$!
else
mitmweb --listen-host 127.0.0.1 --listen-port "$MITMPROXY_PORT" \
--web-open-browser &> /dev/null &
local mitmweb_pid=$!
fi
echo -n " Waiting for mitmweb"
local attempts=0
local max_attempts=50
while ! nc -z 127.0.0.1 "$MITMPROXY_PORT" 2> /dev/null; do
sleep 0.2
echo -n "."
((attempts++))
if [[ $attempts -gt $max_attempts ]]; then
echo " timeout!"
kill "$mitmweb_pid" 2>/dev/null
return 1
fi
done
echo " ready!"
echo ""
echo "╔═══════════════════════════════════════════════════════════════╗"
echo "║ 🔍 mitmweb UI: http://127.0.0.1:$MITMPROXY_WEB_PORT"
echo "║ ║"
echo "║ ✓ Full traffic capture active ║"
echo "║ ✓ Sensitive data auto-redacted ║"
echo "║ ║"
echo "║ Logs: $log_dir"
echo "╚═══════════════════════════════════════════════════════════════╝"
echo ""
export NODE_EXTRA_CA_CERTS="$HOME/.mitmproxy/mitmproxy-ca-cert.pem"
export HTTPS_PROXY="http://127.0.0.1:$MITMPROXY_PORT"
export HTTP_PROXY="http://127.0.0.1:$MITMPROXY_PORT"
export NO_PROXY="localhost,127.0.0.1"
# Enable OpenCode debug mode if requested
if [[ "$opencode_debug" == true ]]; then
export DEBUG="*"
export OPENCODE_DEBUG="1"
echo "✓ OpenCode debug mode enabled (DEBUG=*, OPENCODE_DEBUG=1)"
fi
local opencode_args=""
if [[ -n "$config_path" ]]; then
opencode_args="--config $config_path"
fi
if [[ $# -gt 0 ]]; then
opencode_args="$opencode_args $@"
fi
echo "==============================================================="
echo "Starting OpenCode with mitmproxy"
echo "==============================================================="
if [[ "$capture_logs" == true ]]; then
echo "Logs: $log_dir"
echo ""
fi
if [[ "$opencode_debug" == true ]]; then
echo "🔧 Debug mode: ENABLED"
echo " DEBUG=* OPENCODE_DEBUG=1"
echo ""
fi
if [[ -n "$opencode_args" ]]; then
echo "Command: opencode $opencode_args"
echo ""
fi
echo "Press Ctrl+C to stop"
echo ""
local opencode_exit=0
if [[ "$capture_logs" == true ]]; then
# Run opencode with stderr captured to log file (debug output)
# stdout is the TUI, stderr is debug logs
opencode $opencode_args 2>"$opencode_log"
opencode_exit=$?
else
opencode $opencode_args
opencode_exit=$?
fi
echo ""
echo "==============================================================="
echo "Shutting down..."
echo "==============================================================="
kill "$mitmweb_pid" 2>/dev/null
wait "$mitmweb_pid" 2>/dev/null
if [[ -n "$tmpdir" && -d "$tmpdir" ]]; then
rm -rf "$tmpdir"
fi
unset NODE_EXTRA_CA_CERTS HTTPS_PROXY HTTP_PROXY NO_PROXY DEBUG OPENCODE_DEBUG
if [[ "$capture_logs" == true ]]; then
# Export OpenCode session data
echo ""
echo "Exporting OpenCode session data..."
local session_export_dir="$log_dir/session-data"
mkdir -p "$session_export_dir"
# Copy the export script to the session directory
cp "$log_dir/export_sessions.py" "$session_export_dir/" 2>/dev/null || true
# Run the export using venv Python with Presidio installed
local venv_python="$HOME/.cache/opencode-export-venv/bin/python"
if [[ -f "$venv_python" ]]; then
if "$venv_python" "$log_dir/export_sessions.py" "$session_export_dir" 2>/dev/null; then
echo "✓ Session data exported with Presidio"
else
echo "⚠ Could not export session data"
fi
else
# Fallback to system Python
if python3 "$log_dir/export_sessions.py" "$session_export_dir" 2>/dev/null; then
echo "✓ Session data exported (fallback mode)"
else
echo "⚠ Could not export session data"
fi
fi
# Run automatic analysis
echo ""
echo "Running session analysis..."
if [[ -f "$log_dir/analyze_session.py" ]]; then
if python3 "$log_dir/analyze_session.py" "$session_export_dir" 2>/dev/null; then
echo "✓ Analysis complete"
else
echo "⚠ Analysis failed (non-critical)"
fi
else
echo "⚠ Analysis script not found"
fi
# Write analyze_session.py to session directory
local analyze_script="$log_dir/analyze_session.py"
cat > "$analyze_script" << 'ANALYZE_EOF'
#!/usr/bin/env python3
"""
Post-process OpenCode debug session data to generate analysis summaries.
Generates machine-readable extracts for external analysis tools.
"""
import json
import sys
from collections import defaultdict, Counter
from datetime import datetime
from pathlib import Path
def load_session_data(session_dir):
"""Load all session data files."""
data = {}
parts_path = Path(session_dir) / "parts.json"
if parts_path.exists():
with open(parts_path, 'r') as f:
data['parts'] = json.load(f)
messages_path = Path(session_dir) / "messages.json"
if messages_path.exists():
with open(messages_path, 'r') as f:
data['messages'] = json.load(f)
sessions_path = Path(session_dir) / "sessions.json"
if sessions_path.exists():
with open(sessions_path, 'r') as f:
data['sessions'] = json.load(f)
summary_path = Path(session_dir) / "summary.json"
if summary_path.exists():
with open(summary_path, 'r') as f:
data['summary'] = json.load(f)
return data
def extract_errors(parts, sessions):
"""Extract all errors from parts with references."""
errors = []
session_map = {s['id']: s for s in sessions}
for p in parts:
data = p.get('data', {})
raw = data.get('raw', '')
if not raw:
continue
try:
tool_data = json.loads(raw)
if tool_data.get('type') == 'tool':
state = tool_data.get('state', {})
status = state.get('status', '')
tool_name = tool_data.get('tool', 'unknown')
if status == 'error' or tool_name == 'invalid':
error_info = {
'part_id': p.get('id'),
'session_id': p.get('session_id'),
'session_slug': session_map.get(p.get('session_id'), {}).get('slug', 'unknown'),
'timestamp': p.get('timestamp'),
'tool': tool_name,
'call_id': tool_data.get('callID', 'unknown'),
'status': status,
'error_message': state.get('error') or state.get('output', 'Unknown error'),
'input': state.get('input', {}),
'error_type': categorize_error(state.get('error') or state.get('output', ''))
}
errors.append(error_info)
except:
pass
return errors
def categorize_error(error_msg):
"""Categorize error type."""
error_str = str(error_msg).lower()
if 'file not found' in error_str or 'not found' in error_str:
return 'file_not_found'
elif 'json parsing failed' in error_str or 'parse error' in error_str:
return 'json_parse_error'
elif 'invalid input' in error_str:
return 'invalid_input'
elif 'permission' in error_str or 'access' in error_str:
return 'permission_denied'
elif 'timeout' in error_str:
return 'timeout'
elif 'connection' in error_str:
return 'connection_error'
else:
return 'other'
def generate_error_summary(errors, sessions):
"""Generate human-readable error summary."""
lines = []
lines.append("# Error Summary")
lines.append(f"Generated: {datetime.now().isoformat()}")
lines.append("")
lines.append(f"Total Errors: {len(errors)}")
lines.append("")
# Group by session
by_session = defaultdict(list)
for e in errors:
by_session[e['session_slug']].append(e)
lines.append("## Errors by Session")
lines.append("")
for slug, session_errors in sorted(by_session.items(), key=lambda x: -len(x[1])):
lines.append(f"### {slug}")
lines.append(f"Count: {len(session_errors)}")
lines.append("")
# Group by error type
by_type = defaultdict(list)
for e in session_errors:
by_type[e['error_type']].append(e)
for error_type, type_errors in sorted(by_type.items(), key=lambda x: -len(x[1])):
lines.append(f"**{error_type}** ({len(type_errors)})")
for e in type_errors[:3]: # Show first 3 of each type
error_preview = str(e['error_message'])[:100].replace('\n', ' ')
lines.append(f" - {e['timestamp']}: {error_preview}")
if 'filePath' in e['input']:
lines.append(f" File: {e['input']['filePath']}")
if len(type_errors) > 3:
lines.append(f" ... and {len(type_errors) - 3} more")
lines.append("")
lines.append("## Error Patterns")
lines.append("")
# Find common patterns
error_types = Counter(e['error_type'] for e in errors)
for error_type, count in error_types.most_common():
lines.append(f"- **{error_type}**: {count} occurrences")
return '\n'.join(lines)
def generate_errors_json(errors):
"""Generate machine-readable errors JSON."""
return {
'generated_at': datetime.now().isoformat(),
'total_errors': len(errors),
'errors': errors
}
def generate_key_parts_with_errors(parts, errors):
"""Generate list of key parts that contain or relate to errors."""
# Get all part IDs that are errors
error_part_ids = {e['part_id'] for e in errors}
# Also include parts from same sessions around error times
error_sessions = {e['session_id'] for e in errors}
key_parts = []
for p in parts:
part_id = p.get('id')
session_id = p.get('session_id')
# Include if it's an error part
if part_id in error_part_ids:
key_parts.append({
'part_id': part_id,
'session_id': session_id,
'timestamp': p.get('timestamp'),
'type': 'error',
'reason': 'contains_error'
})
# Include if it's a reasoning or text part in an error session
elif session_id in error_sessions:
data = p.get('data', {})
part_type = data.get('type', 'unknown')
if part_type in ['reasoning', 'text']:
key_parts.append({
'part_id': part_id,
'session_id': session_id,
'timestamp': p.get('timestamp'),
'type': part_type,
'reason': 'context_in_error_session',
'preview': str(data.get('text', ''))[:200]
})
return key_parts
def analyze_requests_jsonl(requests_path, errors):
"""Analyze requests.jsonl for slow calls and error correlations."""
if not Path(requests_path).exists():
return None
requests = []
with open(requests_path, 'r') as f:
for line in f:
try:
requests.append(json.loads(line.strip()))
except:
pass
# Filter for slow API calls
slow_calls = []
for r in requests:
duration = r.get('duration_ms', 0)
if duration > 10000: # Calls over 10 seconds
slow_calls.append({
'id': r.get('id'),
'url': r.get('url', 'unknown')[:100],
'method': r.get('method', 'GET'),
'duration_ms': duration,
'duration_sec': round(duration / 1000, 1),
'timestamp': r.get('timestamp')
})
# Sort by duration
slow_calls.sort(key=lambda x: -x['duration_ms'])
return {
'total_requests': len(requests),
'slow_calls': slow_calls[:50], # Top 50
'error_correlation': find_error_correlations(requests, errors)
}
def find_error_correlations(requests, errors):
"""Find correlations between slow requests and errors."""
# Simple heuristic: check if errors occurred around same time as slow requests
correlations = []
error_times = set()
for e in errors:
ts = e.get('timestamp', '')
if ts:
# Extract just the time portion for rough matching
error_times.add(ts[:16]) # YYYY-MM-DD HH:MM
for r in requests:
if r.get('duration_ms', 0) > 20000: # Very slow calls
req_time = r.get('timestamp', '')[:16]
if req_time in error_times:
correlations.append({
'request_id': r.get('id'),
'url': r.get('url', 'unknown')[:80],
'duration_sec': round(r.get('duration_ms', 0) / 1000, 1),
'timestamp': r.get('timestamp'),
'correlation': 'slow_request_during_errors'
})
return correlations
def generate_slow_api_summary(analysis):
"""Generate human-readable slow API summary."""
if not analysis:
return "# Slow API Calls\n\nNo requests.jsonl found."
lines = []
lines.append("# Slow API Calls Analysis")
lines.append(f"Generated: {datetime.now().isoformat()}")
lines.append("")
lines.append(f"Total Requests: {analysis['total_requests']}")
lines.append(f"Slow Calls (>10s): {len(analysis['slow_calls'])}")
lines.append("")
if analysis['slow_calls']:
lines.append("## Top 20 Slowest Calls")
lines.append("")
lines.append("| Rank | Duration | URL | Method |")
lines.append("|------|----------|-----|--------|")
for i, call in enumerate(analysis['slow_calls'][:20], 1):
url = call['url'][:60] + "..." if len(call['url']) > 60 else call['url']
lines.append(f"| {i} | {call['duration_sec']}s | {url} | {call['method']} |")
lines.append("")
if analysis['error_correlation']:
lines.append("## Correlations with Errors")
lines.append("")
lines.append("The following slow requests occurred during error periods:")
lines.append("")
for corr in analysis['error_correlation'][:10]:
lines.append(f"- {corr['duration_sec']}s at {corr['timestamp']}")
lines.append(f" URL: {corr['url']}")
lines.append("")
return '\n'.join(lines)
def generate_quick_reference(errors, key_parts, slow_analysis):
"""Generate a quick reference guide for external analysis."""
ref = {
'generated_at': datetime.now().isoformat(),
'session_analysis': {
'total_errors': len(errors),
'error_types': dict(Counter(e['error_type'] for e in errors)),
'sessions_with_errors': list(set(e['session_slug'] for e in errors)),
'key_part_ids': [p['part_id'] for p in key_parts],
},
'slow_requests': {
'total_slow_calls': len(slow_analysis.get('slow_calls', [])) if slow_analysis else 0,
'slowest_call_duration_sec': slow_analysis['slow_calls'][0]['duration_sec'] if slow_analysis and slow_analysis.get('slow_calls') else 0,
},
'queries_for_analysis': [
{
'name': 'Get all errors',
'jq': 'cat errors.json | jq \'.errors[] | {session_slug, timestamp, error_type, error_message}\''
},
{
'name': 'Get errors by type',
'jq': 'cat errors.json | jq \'[.errors[] | select(.error_type == "file_not_found")]\''
},
{
'name': 'Get slow API calls',
'jq': 'cat requests.jsonl | jq \'select(.duration_ms > 30000) | {url, duration_ms}\''
},
{
'name': 'Get key parts',
'jq': 'cat key-parts.json | jq \'.key_parts[] | {part_id, session_id, type, reason}\''
},
{
'name': 'Find corrupted file paths',
'jq': 'cat errors.json | jq \'.errors[] | select(.input.filePath | startswith(": ")) | .input.filePath\' | sort -u'
}
]
}
return ref
def main():
if len(sys.argv) < 2:
print("Usage: python analyze_session.py <session_dir>")
print("")
print("Analyzes OpenCode session data and generates summary files:")
print(" - errors.json : Machine-readable error data")
print(" - errors-summary.md : Human-readable error summary")
print(" - key-parts.json : Key parts with error context")
print(" - slow-api-calls.md : Slow API call analysis")
print(" - quick-reference.json : Quick reference for analysis")
sys.exit(1)
session_dir = Path(sys.argv[1])
log_dir = session_dir.parent
print(f"Analyzing session: {session_dir}")
# Load data
data = load_session_data(session_dir)
if not data.get('parts'):
print("Error: No parts.json found")
sys.exit(1)
print(f"Loaded {len(data['parts'])} parts, {len(data.get('sessions', []))} sessions")
# Extract errors
errors = extract_errors(data['parts'], data.get('sessions', []))
print(f"Found {len(errors)} errors")
# Generate error files
errors_json = generate_errors_json(errors)
with open(log_dir / "errors.json", 'w') as f:
json.dump(errors_json, f, indent=2)
print(f" ✓ errors.json ({len(errors)} errors)")
errors_summary = generate_error_summary(errors, data.get('sessions', []))
with open(log_dir / "errors-summary.md", 'w') as f:
f.write(errors_summary)
print(f" ✓ errors-summary.md")
# Generate key parts
key_parts = generate_key_parts_with_errors(data['parts'], errors)
with open(log_dir / "key-parts.json", 'w') as f:
json.dump({
'generated_at': datetime.now().isoformat(),
'total_key_parts': len(key_parts),
'key_parts': key_parts
}, f, indent=2)
print(f" ✓ key-parts.json ({len(key_parts)} parts)")
# Analyze requests
requests_path = log_dir / "requests.jsonl"
slow_analysis = analyze_requests_jsonl(requests_path, errors)
if slow_analysis:
with open(log_dir / "slow-api-calls.json", 'w') as f:
json.dump({
'generated_at': datetime.now().isoformat(),
'total_requests': slow_analysis['total_requests'],
'slow_calls_count': len(slow_analysis['slow_calls']),
'slow_calls': slow_analysis['slow_calls'],
'error_correlations': slow_analysis['error_correlation']
}, f, indent=2)
print(f" ✓ slow-api-calls.json ({len(slow_analysis['slow_calls'])} slow calls)")
slow_summary = generate_slow_api_summary(slow_analysis)
with open(log_dir / "slow-api-calls.md", 'w') as f:
f.write(slow_summary)
print(f" ✓ slow-api-calls.md")
# Generate quick reference
quick_ref = generate_quick_reference(errors, key_parts, slow_analysis)
with open(log_dir / "quick-reference.json", 'w') as f:
json.dump(quick_ref, f, indent=2)
print(f" ✓ quick-reference.json")
print(f"\n✓ Analysis complete. Files saved to: {log_dir}")
if __name__ == "__main__":
main()
ANALYZE_EOF
# Create summary
local summary_file="$log_dir/README.txt"
{
echo "OpenCode Debug Session"
echo "======================"
echo "Session: $session_name"
echo "Date: $(date -u +"%Y-%m-%dT%H:%M:%SZ")"
echo ""
echo "Configuration:"
if [[ -n "$config_name" ]]; then
echo " Config: $config_name (isolated)"
elif [[ "$use_isolated" == true ]]; then
echo " Config: temporary-isolated"
else
echo " Config: default"
fi
echo ""
echo "Exit Code: $opencode_exit"
echo ""
echo "Files:"
echo " - mitm.log: Proxy connection logs"
echo " - opencode.log: OpenCode debug output (stderr)"
echo " - requests.jsonl: Complete HTTP traffic (JSON Lines format, no truncation)"
echo " - session-data/: OpenCode session and message data"
echo " - sessions.json: Parent and child subagent session metadata"
echo " - messages.json: All conversation messages (sanitized)"
echo " - parts.json: Message parts/components (sanitized)"
echo " - summary.json: Session tree summary"
echo " - config/: Configuration files used"
echo " - capture_script.py: mitmproxy capture script"
echo " - export_sessions.py: Session export utility"
echo " - analyze_session.py: Automatic analysis script"
echo " - transform_log.py: Human-readable log transformer"
echo ""
echo "Auto-Generated Analysis:"
echo " - errors.json: Machine-readable error data with references"
echo " - errors-summary.md: Human-readable error summary"
echo " - key-parts.json: Key parts/pieces with error context"
echo " - slow-api-calls.json: Slow API call data (JSON)"
echo " - slow-api-calls.md: Slow API call analysis"
echo " - quick-reference.json: Quick reference for external analysis"
echo ""
echo "JSONL Format (requests.jsonl):"
echo " Each line is a complete JSON object with:"
echo " - id, timestamp, timestamp_epoch_ms"
echo " - method, url"
echo " - request_headers, request_body (full, sanitized)"
echo " - response_status, response_headers, response_body (full, sanitized)"
echo " - response_time, response_epoch_ms"
echo " - duration_ms (request-response timing)"
echo ""
echo "Session Data Export:"
echo " Captures parent session + all subagent sessions automatically"
echo " Privacy redaction applied using Microsoft Presidio (if available)"
echo " Falls back to comprehensive regex patterns"
echo ""
echo "Processing Commands:"
echo " python3 transform_log.py requests.jsonl mitm-readable.log # Transform HTTP logs"
echo " python3 export_sessions.py ./session-data [session_id] # Re-export sessions"
echo ""
echo "Sensitive data automatically redacted:"
echo " ✓ Authorization headers"
echo " ✓ API keys (x-api-key, api-key, etc.)"
echo " ✓ Tokens (auth, access, refresh, bearer)"
echo " ✓ Secrets (client-secret, api-secret)"
echo " ✓ Passwords and private keys"
echo " ✓ OpenAI API keys (sk-...)"
echo " ✓ Public IP addresses ([IP-MASKED])"
echo ""
echo "Private IPs preserved: 10.x.x.x, 192.168.x.x, 127.x.x.x"
echo ""
echo "Review commands:"
echo " jq '.url' requests.jsonl | head -20 # List URLs"
echo " jq 'select(.url | contains(\"kimi\"))' requests.jsonl # Filter by URL"
echo " python3 transform_log.py requests.jsonl readable.log # Human readable"
} > "$summary_file"
# Copy configs
local config_backup_dir="$log_dir/config"
mkdir -p "$config_backup_dir"
if [[ -n "$original_config" && -f "$original_config" ]]; then
cp "$original_config" "$config_backup_dir/"
if [[ -n "$config_name" ]]; then
local isolated_dir="$OPENCODE_ISOLATED_CONFIGS_DIR/$config_name"
if [[ -d "$isolated_dir" ]]; then
cp -r "$isolated_dir"/* "$config_backup_dir/" 2>/dev/null || true
fi
fi
fi
local main_config="$HOME/.config/opencode/opencode.jsonc"
if [[ -f "$main_config" && ! -f "$config_backup_dir/opencode.jsonc" ]]; then
cp "$main_config" "$config_backup_dir/opencode.jsonc" 2>/dev/null || true
fi
# Create archive
local archive_file="$log_dir/session-archive.tar.gz"
tar -czf "$archive_file" -C "$log_dir" \
README.txt mitm.log opencode.log requests.jsonl \
errors.json errors-summary.md \
key-parts.json slow-api-calls.json slow-api-calls.md quick-reference.json \
session-data/ config/ \
capture_script.py transform_log.py export_sessions.py analyze_session.py 2>/dev/null
# Create list of files to upload (text files only, not binary archive)
local upload_files=()
[[ -f "$log_dir/README.txt" ]] && upload_files+=("$log_dir/README.txt")
[[ -f "$log_dir/mitm.log" ]] && upload_files+=("$log_dir/mitm.log")
[[ -f "$log_dir/opencode.log" ]] && upload_files+=("$log_dir/opencode.log")
[[ -f "$log_dir/requests.jsonl" ]] && upload_files+=("$log_dir/requests.jsonl")
[[ -d "$log_dir/session-data" ]] && {
[[ -f "$log_dir/session-data/summary.json" ]] && upload_files+=("$log_dir/session-data/summary.json")
[[ -f "$log_dir/session-data/sessions.json" ]] && upload_files+=("$log_dir/session-data/sessions.json")
[[ -f "$log_dir/session-data/messages.json" ]] && upload_files+=("$log_dir/session-data/messages.json")
[[ -f "$log_dir/session-data/parts.json" ]] && upload_files+=("$log_dir/session-data/parts.json")
}
# Analysis files (auto-generated)
[[ -f "$log_dir/errors.json" ]] && upload_files+=("$log_dir/errors.json")
[[ -f "$log_dir/errors-summary.md" ]] && upload_files+=("$log_dir/errors-summary.md")
[[ -f "$log_dir/key-parts.json" ]] && upload_files+=("$log_dir/key-parts.json")
[[ -f "$log_dir/slow-api-calls.json" ]] && upload_files+=("$log_dir/slow-api-calls.json")
[[ -f "$log_dir/slow-api-calls.md" ]] && upload_files+=("$log_dir/slow-api-calls.md")
[[ -f "$log_dir/quick-reference.json" ]] && upload_files+=("$log_dir/quick-reference.json")
local gist_url=""
if [[ "$upload_gist" == true ]]; then
echo ""
echo "Uploading to gist..."
if command -v gh &> /dev/null && gh auth status &> /dev/null; then
if [[ ${#upload_files[@]} -gt 0 ]]; then
# Upload text files to gist
gist_url=$(gh gist create "${upload_files[@]}" --desc "OpenCode debug - $session_name" 2>&1 | grep -E '^https://gist.github.com/' | tail -1)
if [[ -n "$gist_url" ]]; then
echo "✓ Uploaded to gist: $gist_url"
echo " (Session archive available locally at: $archive_file)"
else
echo "⚠ Gist upload failed"
echo " Archive available locally at: $archive_file"
fi
else
echo "⚠ No files to upload"
fi
else
echo "⚠ GitHub CLI not available for gist upload"
echo " Archive available locally at: $archive_file"
fi
fi
echo ""
echo "╔═══════════════════════════════════════════════════════════════╗"
echo "║ SESSION COMPLETE ║"
echo "╠═══════════════════════════════════════════════════════════════╣"
echo "║ 📁 $log_dir"
echo "║ ║"
echo "║ 📄 requests.jsonl - Full HTTP traffic (JSON Lines) ║"
echo "║ Includes timing: duration_ms for each request ║"
echo "║ 📄 opencode.log - OpenCode debug output (stderr) ║"
echo "║ 📁 session-data/ - OpenCode sessions + subagents ║"
echo "║ sessions.json, messages.json, parts.json ║"
echo "║ 📊 Auto-Generated Analysis: ║"
echo "║ errors.json, errors-summary.md ║"
echo "║ key-parts.json, slow-api-calls.json/md ║"
echo "║ quick-reference.json ║"
echo "║ 📦 session-archive.tar.gz - Complete archive ║"
echo "║ ║"
echo "║ Commands: ║"
echo "║ python3 transform_log.py requests.jsonl out.log ║"
echo "║ python3 export_sessions.py ./session-data ║"
echo "║ python3 analyze_session.py ./session-data ║"
if [[ -n "$gist_url" ]]; then
echo "║ ║"
echo "║ 🔗 Gist: $gist_url"
fi
echo "║ ║"
echo "╚═══════════════════════════════════════════════════════════════╝"
fi
echo ""
echo "✓ mitmweb stopped"
return $opencode_exit
}
_proxy_opencode_show_help() {
cat << 'HELPDOC'
proxy_opencode - Debug OpenCode with full traffic capture
USAGE:
proxy_opencode [OPTIONS] [-- OPENCODE_ARGS]
OPTIONS:
--isolated, -i Use temporary isolated config
--config, -c NAME Use named isolated config
--edit, -e Edit config before running
--upload, -u Upload logs to GitHub Gist
--debug, -d Enable OpenCode debug mode (default: ON)
--no-debug Disable OpenCode debug mode
--no-logs Don't capture logs
--help, -h Show help
PRIVACY PROTECTION:
The tool automatically redacts sensitive data from logs:
Headers: authorization, x-api-key, api-key, token, cookie,
client-secret, access-token, refresh-token, etc.
Body patterns: api_key, token, secret, password, sk-...,
AWS keys, JWT tokens, etc.
IP Addresses: Public IPs are masked as [IP-MASKED]
Private IPs (10.x, 192.168.x, 127.x) are preserved
All redacted values are replaced with: [TRUNCATED]
EXAMPLES:
proxy_opencode # Run with debug + full capture
proxy_opencode --isolated # Fresh isolated config
proxy_opencode --upload # Capture and upload
proxy_opencode --config test # Use isolated config
proxy_opencode --no-debug # Disable debug mode
FILES:
~/.config/opencode/ # Main config
~/.config/opencode/isolated-configs/ # Isolated configs
/tmp/opencode-debug-logs/ # Session logs
ALIASES:
poc, poci, pocu, poccfg, pocedit, poclist, pocdel
For more info: https://github.com/opencode-ai/opencode
HELPDOC
}
alias poc='proxy_opencode'
alias poci='proxy_opencode --isolated'
alias pocu='proxy_opencode --upload'
alias poccfg='proxy_opencode_create_config'
alias pocedit='proxy_opencode_edit_config'
alias poclist='proxy_opencode_list_configs'
alias pocdel='proxy_opencode_delete_config'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment