Skip to content

Instantly share code, notes, and snippets.

@oneryalcin
Last active January 12, 2026 13:38
Show Gist options
  • Select an option

  • Save oneryalcin/0f0dc48cdc29dfcbebc4c6dcfe4cbf8f to your computer and use it in GitHub Desktop.

Select an option

Save oneryalcin/0f0dc48cdc29dfcbebc4c6dcfe4cbf8f to your computer and use it in GitHub Desktop.
Claude Python SDK Monkey Patch for enriching messages
"""
SDK Patch with Subagent Support - Adds .raw field to SDK message types with maximum fidelity.
Provides drop-in replacements:
- ClaudeSDKClientWithRaw: replaces ClaudeSDKClient
- query_with_raw: replaces query
Data sources for .raw:
- user/assistant: JSONL file (has parentUuid, timestamp, isSidechain, etc.)
- result: Raw CLI output (has modelUsage, errors, permission_denials)
- system: Raw CLI output (already complete)
Usage:
from sdk_patch_with_subagents import ClaudeSDKClientWithRaw, ClaudeAgentOptions
async with ClaudeSDKClientWithRaw(options) as client:
await client.query("Your prompt")
async for msg in client.receive_response():
print(msg.raw.get("parentUuid"))
print(msg.raw.get("timestamp"))
JSONL FILE STRUCTURE AND SUBAGENT LINKING
=========================================
Claude Code stores conversation history in JSONL files. Understanding this structure
is essential for correlating SDK messages with their rich metadata.
1. FILE LOCATIONS
-----------------
Main session JSONL:
~/.claude/projects/<mangled-cwd>/<session_id>.jsonl
Subagent JSONL (when Task tool spawns agents):
~/.claude/projects/<mangled-cwd>/<session_id>/subagents/agent-<agentId>.jsonl
Where:
- <mangled-cwd>: The working directory with "/" replaced by "-"
Example: /private/tmp/myproject -> -private-tmp-myproject
- <session_id>: UUID like "0cb45b7d-8211-46a6-858f-b0aeea392e7f"
- <agentId>: Short ID like "acd4485" (7 chars)
2. MESSAGE TYPES IN JSONL
-------------------------
Each line is a JSON object with a "type" field:
- "queue-operation": Internal queue management (skip these)
- "user": User/tool-result messages
- "assistant": Claude's responses
- "system": System events (init, etc.) - usually not in JSONL
3. KEY FIELDS FOR LINKING
-------------------------
All messages have:
- uuid: Unique identifier for this message
- parentUuid: UUID of the message this responds to (builds conversation tree)
- sessionId: Session this belongs to
- timestamp: ISO-8601 timestamp
- isSidechain: false for main conversation, true for subagents
Subagent-specific:
- agentId: Short ID identifying the subagent (e.g., "acd4485")
Assistant messages additionally have:
- message.id: Anthropic API message ID (e.g., "msg_01RJ3XzgQ2gzafb75h1q5jeP")
- message.model: Model used (e.g., "claude-sonnet-4-5-20250929")
- message.usage: Token usage stats
- requestId: API request ID
User messages with tool results have:
- toolUseResult: Rich metadata about tool execution
- toolUseResult.agentId: Links to subagent (when Task tool was used)
4. SUBAGENT LINKING MECHANISM
-----------------------------
When the main agent uses the Task tool to spawn a subagent:
MAIN SESSION (parent):
Entry 1: assistant with tool_use for "Task" tool
Entry 2: user with tool_result containing:
{
"toolUseResult": {
"agentId": "acd4485", <- LINK TO SUBAGENT
"status": "completed",
"prompt": "original prompt..."
}
}
SUBAGENT SESSION (child):
File: <session_id>/subagents/agent-acd4485.jsonl
Every entry has:
- sessionId: SAME as parent (shared!)
- agentId: "acd4485"
- isSidechain: true
Linking logic:
Parent -> Child: toolUseResult.agentId matches child's agentId
Child -> Parent: Same sessionId, isSidechain=true indicates subordinate
5. EXAMPLE CONVERSATION TREE
----------------------------
Main session JSONL:
user (uuid=A, parentUuid=null) <- Initial prompt
└── assistant (uuid=B, parentUuid=A) <- Uses Task tool
└── user (uuid=C, parentUuid=B, toolUseResult.agentId="xyz123") <- Task result
└── assistant (uuid=D, parentUuid=C) <- Final response
Subagent JSONL (agent-xyz123.jsonl):
user (uuid=X, parentUuid=null, agentId="xyz123", isSidechain=true) <- Subagent prompt
└── assistant (uuid=Y, parentUuid=X, agentId="xyz123") <- Subagent uses Glob
└── user (uuid=Z, parentUuid=Y, agentId="xyz123") <- Glob result
└── assistant (uuid=W, parentUuid=Z, agentId="xyz123") <- Subagent response
6. WHY THIS MATTERS
-------------------
The SDK's parsed messages lose most of this metadata. This patch preserves it via .raw:
msg.raw.get("parentUuid") # Build conversation trees
msg.raw.get("timestamp") # Precise timing
msg.raw.get("message").get("usage") # Token usage per message
msg.raw.get("toolUseResult") # Rich tool execution data
msg.raw.get("agentId") # Identify subagent messages
msg.raw.get("isSidechain") # Distinguish main vs subagent
For subagent support, we watch for toolUseResult.agentId in tool_result messages,
then read the corresponding subagent JSONL file to get its full conversation.
7. SDK VISIBILITY INTO SUBAGENTS (FOREGROUND vs BACKGROUND)
------------------------------------------------------------
The Task tool can run subagents in foreground (default) or background mode.
This significantly affects what messages the SDK exposes.
FOREGROUND (run_in_background=false, default):
Message stream includes subagent internals:
1. SystemMessage init
2. AssistantMessage (main, sonnet) - decides to use Task
3. AssistantMessage (main) - ToolUseBlock name=Task
4. UserMessage - initial Task ack
5. AssistantMessage (subagent, haiku) - ToolUseBlock name=Bash <- SUBAGENT VISIBLE
6. UserMessage - ToolResultBlock from Bash <- SUBAGENT VISIBLE
7. UserMessage - Task final result with toolUseResult.agentId
8. AssistantMessage (main) - summarizes
9. ResultMessage
You can identify subagent messages by:
- Different model (e.g., haiku when main is sonnet)
- msg.raw.get("isSidechain") == True
- msg.raw.get("agentId") is set
BACKGROUND (run_in_background=true):
Subagent runs async, internals NOT in message stream:
1. SystemMessage init
2. AssistantMessage (main) - decides to use Task
3. AssistantMessage (main) - ToolUseBlock name=Task
4. UserMessage - "Async agent launched. agentId: xxx" <- IMMEDIATE RETURN
5. AssistantMessage (main) - may poll output file
...
N. ResultMessage
To get subagent data in background mode:
- Read the subagent JSONL directly: <session_id>/subagents/agent-<agentId>.jsonl
- Or use TaskOutput tool to retrieve results
COMPARISON TABLE:
| Aspect | Foreground | Background |
|-----------------------------|------------|------------|
| Subagent tool calls visible | Yes | No |
| Subagent tool results visible | Yes | No |
| Blocking | Yes | No |
| Can identify via model | Yes | N/A |
| JSONL has full history | Yes | Yes |
Use foreground when: You need full visibility into subagent work
Use background when: Long tasks, parallel execution, don't need live updates
8. DISTRIBUTED / MULTI-MACHINE RESUME
--------------------------------------
This patch supports distributed session resume where follow-up questions may be
handled by different machines (with JSONL files synced/copied between them).
SCENARIO:
Machine A: User asks question 1 → session abc-123 created
Subagents spawn, JSONL files written
Session ends, client disconnects
Machine B: User asks follow-up question 2
Synced JSONL files from Machine A available
Resumes session abc-123 with same session_id
HOW IT WORKS:
- Machine B opens JSONL files at position 0 (no saved file position)
- Re-reads ALL old entries from Machine A into queue
- Old entries have old UUIDs/signatures → won't match new CLI messages
- Queue fills up → FIFO eviction drops oldest entries
- New entries (from Machine B's CLI) appear at queue tail
- New CLI messages match against new entries correctly
PERFORMANCE CHARACTERISTICS:
Typical session: ~100-300 main entries, ~10-50 per subagent, ~500KB total
On machine switch:
- Disk read: ~10-50ms (SSD)
- JSON parse: ~10-50ms
- Total startup overhead: ~50-100ms (acceptable)
Per-message matching:
- Queue: ~200 old entries + new entries
- Each old entry: UUID/signature mismatch check is O(1)
- Total: ~200 microseconds per message (negligible)
WHEN THIS WOULD BE A PROBLEM (not typical):
- Sessions with 10,000+ entries
- Machine switches on every request
- Multi-MB content per entry
POTENTIAL OPTIMIZATION (not implemented, tracked for future):
If distributed resume becomes a bottleneck, could add file position tracking:
class ClaudeSDKClientWithRaw:
def __init__(self, options, file_positions: dict[str, int] = None):
# file_positions from shared store (Redis, DB)
# On resume, seek to saved position instead of re-reading
For now, the ~50-100ms overhead is acceptable for production use.
9. TESTED VERSIONS
------------------
This patch was developed and tested with:
- Claude Code CLI: 2.1.5
- claude-agent-sdk (Python): 0.1.19
File format and linking behavior may differ in other versions.
"""
import asyncio
import hashlib
import json
import logging
import unicodedata
from pathlib import Path
from typing import AsyncIterator, Any, TextIO
from claude_agent_sdk import (
ClaudeAgentOptions,
Message,
UserMessage,
AssistantMessage,
SystemMessage,
ResultMessage,
ClaudeSDKClient,
)
from claude_agent_sdk._internal.message_parser import parse_message
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
logger = logging.getLogger(__name__)
# Queue holds unmatched JSONL entries during correlation.
# Matched entries are popped immediately, so queue stays small in normal operation.
# Only unmatched entries (e.g., initial prompt, queue-operation) accumulate.
# The 500 cap is a safety net - if exceeded, oldest half is dropped (FIFO eviction).
_MAX_QUEUE_SIZE = 500
# Retry settings for JSONL matching.
# CLI stdout may arrive before JSONL is flushed to disk, especially under I/O load.
# 20 retries × 100ms = 2 second window to handle slow disks, network FS, high load.
#
# REVIEW NOTE: 2 second window is tight under heavy I/O pressure (NFS, slow storage).
# Monitor for "Failed to match after 20 retries" in production logs.
# If frequent, consider increasing _MATCH_RETRIES or _MATCH_RETRY_DELAY.
_MATCH_RETRIES = 20
_MATCH_RETRY_DELAY = 0.100
def _find_jsonl_path(session_id: str, cwd: str) -> Path:
normalized = cwd.replace("\\", "/")
mangled = normalized.replace("/", "-")
return Path.home() / ".claude" / "projects" / mangled / f"{session_id}.jsonl"
def _find_subagents_dir(session_id: str, cwd: str) -> Path:
"""Get path to subagents directory for a session."""
normalized = cwd.replace("\\", "/")
mangled = normalized.replace("/", "-")
return Path.home() / ".claude" / "projects" / mangled / session_id / "subagents"
def _get_task_tool_use_id(content: list) -> str | None:
"""Get the tool_use id if content contains a Task tool_use block.
REVIEW NOTE: Case-sensitive match on "Task". If CLI changes tool name
to "task" or "TASK", detection will break. Monitor for CLI updates.
"""
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use" and block.get("name") == "Task":
return block.get("id")
return None
def _get_tool_result_ids(content: list) -> set[str]:
"""Get all tool_use_ids from tool_result blocks in content."""
ids = set()
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
tool_use_id = block.get("tool_use_id")
if tool_use_id:
ids.add(tool_use_id)
return ids
def _stable_hash(s: str) -> str:
return hashlib.sha256(s.encode('utf-8')).hexdigest()[:16]
def _content_signature_raw(content: Any) -> str | None:
"""Create content signature for matching assistant messages.
Uses SHA256 hash of first 200 chars for text/thinking blocks.
Tool use/result blocks use their unique IDs directly.
"""
if not isinstance(content, list):
return None
parts = []
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type")
if btype == "text":
text = block.get('text', '')
if text:
parts.append(f"text:{_stable_hash(unicodedata.normalize('NFC', str(text))[:200])}")
elif btype == "tool_use":
if block.get('id'):
parts.append(f"tool_use:{block.get('id')}")
elif btype == "thinking":
thinking = block.get('thinking', '')
if thinking:
parts.append(f"thinking:{_stable_hash(unicodedata.normalize('NFC', str(thinking))[:200])}")
elif btype == "tool_result":
if block.get('tool_use_id'):
parts.append(f"tool_result:{block.get('tool_use_id')}")
return "|".join(parts) if parts else None
class _JsonlReader:
"""Reads and queues JSONL entries for correlation matching.
REVIEW NOTE: Uses synchronous file I/O (open, readline) in async context.
For typical sessions (100-300 entries), this blocks event loop for ~1-10ms.
Acceptable for current usage. For high-throughput scenarios (10k+ entries),
consider wrapping with anyio.to_thread.run_sync() or using aiofiles.
Memory footprint: ~725KB for 30 subagent readers with empty queues.
"""
def __init__(self, path: Path):
self.path = path
self.file: TextIO | None = None
self.queue: list[dict[str, Any]] = []
self._incomplete_line: str = ""
def open(self) -> bool:
try:
# REVIEW NOTE: Sync I/O - blocks event loop briefly
self.file = open(self.path, encoding='utf-8')
logger.debug(f"Opened JSONL reader: {self.path}")
return True
except (FileNotFoundError, PermissionError, OSError) as e:
logger.debug(f"Failed to open JSONL: {self.path} ({e})")
return False
def close(self):
if self.file:
try:
self.file.close()
except Exception:
pass
self.file = None
def read_pending(self):
if not self.file:
return
try:
while True:
line = self.file.readline()
if not line:
break
if not line.endswith('\n'):
self._incomplete_line += line
break
if self._incomplete_line:
line = self._incomplete_line + line
self._incomplete_line = ""
line = line.strip()
if line:
try:
data = json.loads(line)
# Only queue types we can match (user/assistant)
# Skip queue-operation, system, etc. to avoid accumulation
if data.get("type") in ("user", "assistant"):
self.queue.append(data)
except json.JSONDecodeError:
pass
except (IOError, OSError):
self.file = None
if len(self.queue) > _MAX_QUEUE_SIZE:
dropped = len(self.queue) - _MAX_QUEUE_SIZE // 2
logger.warning(f"JSONL queue overflow, dropping {dropped} oldest entries")
self.queue = self.queue[_MAX_QUEUE_SIZE // 2:]
async def match_user(self, uuid: str | None) -> dict[str, Any] | None:
if not uuid:
return None
for _ in range(_MATCH_RETRIES):
self.read_pending()
for i, raw in enumerate(self.queue):
if raw.get("type") == "user" and str(raw.get("uuid")) == str(uuid):
logger.debug(f"Matched user message uuid={uuid[:8]}...")
return self.queue.pop(i)
await asyncio.sleep(_MATCH_RETRY_DELAY)
logger.debug(f"Failed to match user message uuid={uuid[:8]}... after {_MATCH_RETRIES} retries")
return None
async def match_assistant(self, cli_content: list | None) -> dict[str, Any] | None:
if not cli_content:
return None
target_sig = _content_signature_raw(cli_content)
if not target_sig:
logger.debug("Cannot create signature for assistant content (empty or invalid)")
return None
for _ in range(_MATCH_RETRIES):
self.read_pending()
for i, raw in enumerate(self.queue):
if raw.get("type") != "assistant":
continue
jsonl_content = raw.get("message", {}).get("content", [])
if _content_signature_raw(jsonl_content) == target_sig:
logger.debug(f"Matched assistant message sig={target_sig[:30]}...")
return self.queue.pop(i)
await asyncio.sleep(_MATCH_RETRY_DELAY)
logger.debug(f"Failed to match assistant message sig={target_sig[:30]}... after {_MATCH_RETRIES} retries")
return None
def _attach_raw(msg: Message, raw: dict[str, Any] | None) -> Message:
try:
msg.raw = raw # type: ignore[attr-defined]
except AttributeError:
pass
return msg
class ClaudeSDKClientWithRaw(ClaudeSDKClient):
"""
Drop-in replacement for ClaudeSDKClient with .raw field on messages.
Supports subagent messages: when Task tool spawns a subagent in foreground mode,
subagent messages are correlated with their JSONL for full metadata (agentId,
isSidechain, parentUuid, etc.).
Usage:
async with ClaudeSDKClientWithRaw(options) as client:
await client.query("Hello")
async for msg in client.receive_response():
# Normal SDK fields
if isinstance(msg, AssistantMessage):
print(msg.model)
# Rich .raw data (works for both main and subagent messages)
print(msg.raw.get("parentUuid"))
print(msg.raw.get("timestamp"))
print(msg.raw.get("agentId")) # Set for subagent messages
print(msg.raw.get("isSidechain")) # True for subagent messages
"""
def __init__(self, options: ClaudeAgentOptions | None = None):
super().__init__(options)
self._jsonl_reader: _JsonlReader | None = None
self._subagent_readers: dict[str, _JsonlReader] = {} # filename -> reader
self._session_id: str | None = None
self._cwd: str | None = None
self._in_subagent: bool = False
self._pending_task_ids: set[str] = set() # Track active Task tool_use_ids
async def _open_new_subagent_readers(self) -> None:
"""Open any new subagent JSONL files that haven't been opened yet."""
if not self._session_id or not self._cwd:
return
subagents_dir = _find_subagents_dir(self._session_id, self._cwd)
if not subagents_dir.exists():
return
# Find all subagent JSONL files
for jsonl_path in subagents_dir.glob("agent-*.jsonl"):
filename = jsonl_path.name
if filename not in self._subagent_readers:
reader = _JsonlReader(jsonl_path)
if reader.open():
self._subagent_readers[filename] = reader
logger.debug(f"Opened subagent reader: {filename}")
def _close_all_subagent_readers(self) -> None:
"""Close all subagent readers."""
if self._subagent_readers:
logger.debug(f"Closing {len(self._subagent_readers)} subagent reader(s)")
for reader in self._subagent_readers.values():
reader.close()
self._subagent_readers.clear()
async def _match_with_fallback(
self, msg_type: str, raw_cli: dict[str, Any]
) -> dict[str, Any] | None:
"""Try main reader, fall back to all subagent readers if in subagent mode."""
jsonl_raw = None
if msg_type == 'user':
uuid = raw_cli.get('uuid')
# Try main first
if self._jsonl_reader:
jsonl_raw = await self._jsonl_reader.match_user(uuid)
# If in subagent mode and no match, try all subagent readers
if not jsonl_raw and self._in_subagent:
await self._open_new_subagent_readers()
for reader in self._subagent_readers.values():
jsonl_raw = await reader.match_user(uuid)
if jsonl_raw:
break
elif msg_type == 'assistant':
cli_content = raw_cli.get('message', {}).get('content', [])
# Try main first
if self._jsonl_reader:
jsonl_raw = await self._jsonl_reader.match_assistant(cli_content)
# If in subagent mode and no match, try all subagent readers
if not jsonl_raw and self._in_subagent:
await self._open_new_subagent_readers()
for reader in self._subagent_readers.values():
jsonl_raw = await reader.match_assistant(cli_content)
if jsonl_raw:
break
return jsonl_raw
async def receive_messages(self) -> AsyncIterator[Message]:
"""Receive messages with .raw attached (including subagent messages)."""
if not self._query:
from claude_agent_sdk._errors import CLIConnectionError
raise CLIConnectionError("Not connected. Call connect() first.")
async for raw_cli in self._query.receive_messages():
msg_type = raw_cli.get('type')
# Parse to SDK type
try:
sdk_msg = parse_message(raw_cli)
except Exception:
continue
# Get session info from system init
if msg_type == 'system' and raw_cli.get('subtype') == 'init':
self._session_id = raw_cli.get('session_id')
self._cwd = raw_cli.get('cwd')
if self._session_id and self._cwd:
jsonl_path = _find_jsonl_path(self._session_id, self._cwd)
# Wait for file
for _ in range(50):
if jsonl_path.exists():
break
await asyncio.sleep(0.05)
self._jsonl_reader = _JsonlReader(jsonl_path)
self._jsonl_reader.open()
_attach_raw(sdk_msg, raw_cli)
elif msg_type == 'result':
# Result: use CLI (has modelUsage, errors)
_attach_raw(sdk_msg, raw_cli)
elif msg_type == 'assistant':
cli_content = raw_cli.get('message', {}).get('content', [])
# Check if this assistant message spawns a Task (enter subagent mode)
task_id = _get_task_tool_use_id(cli_content)
if task_id:
self._pending_task_ids.add(task_id)
self._in_subagent = True
logger.debug(f"Entering subagent mode, task_id={task_id[:12]}...")
jsonl_raw = await self._match_with_fallback(msg_type, raw_cli)
_attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli)
elif msg_type == 'user':
cli_content = raw_cli.get('message', {}).get('content', [])
jsonl_raw = await self._match_with_fallback(msg_type, raw_cli)
_attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli)
# Track Task completion for subagent mode state
# NOTE: We intentionally do NOT close subagent readers on task completion.
# Reason: Subagents support "resume" - the main agent can ask follow-up
# questions to a completed subagent. When resumed, new entries are appended
# to the same agent-<id>.jsonl file. If we closed the reader, we'd lose
# the file position and re-read old entries on reopen.
#
# Cost of keeping readers open:
# - File handles: ~30 for long session (cheap, ulimit typically 1024+)
# - Memory: completed subagents have empty queues (O(1) per reader)
# - Scan time: empty queues short-circuit quickly
#
# All readers are closed on session disconnect.
if isinstance(cli_content, list):
result_ids = _get_tool_result_ids(cli_content)
completed = self._pending_task_ids & result_ids
if completed:
self._pending_task_ids -= completed
logger.debug(f"Task completed: {completed}")
if not self._pending_task_ids:
logger.debug("Exiting subagent mode (readers kept open for resume)")
self._in_subagent = False
# Don't close readers here - subagents may be resumed later
else:
_attach_raw(sdk_msg, raw_cli)
yield sdk_msg
async def disconnect(self) -> None:
"""Disconnect and cleanup."""
if self._jsonl_reader:
self._jsonl_reader.close()
self._jsonl_reader = None
self._close_all_subagent_readers()
await super().disconnect()
# Also provide query_with_raw for simple one-shot usage
#
# REVIEW NOTE: Code duplication between ClaudeSDKClientWithRaw and query_with_raw.
# Both implement similar logic for:
# - _match_with_fallback / match_with_fallback
# - _open_new_subagent_readers / open_new_subagent_readers
# - Task tool_use tracking and subagent state machine
#
# This duplication means bug fixes must be applied twice. Future refactoring could
# extract common logic to a shared _RawCorrelator helper class. Tracked for later.
#
async def query_with_raw(
prompt: str,
options: ClaudeAgentOptions | None = None,
) -> AsyncIterator[Message]:
"""
One-shot query with .raw attached to messages.
Drop-in replacement for claude_agent_sdk.query().
Supports subagent messages with full metadata.
"""
options = options or ClaudeAgentOptions()
transport = SubprocessCLITransport(prompt, options)
await transport.connect()
session_id: str | None = None
cwd: str | None = None
reader: _JsonlReader | None = None
subagent_readers: dict[str, _JsonlReader] = {}
in_subagent: bool = False
pending_task_ids: set[str] = set()
async def open_new_subagent_readers() -> None:
"""Open any new subagent JSONL files that haven't been opened yet."""
nonlocal subagent_readers
if not session_id or not cwd:
return
subagents_dir = _find_subagents_dir(session_id, cwd)
if not subagents_dir.exists():
return
for jsonl_path in subagents_dir.glob("agent-*.jsonl"):
filename = jsonl_path.name
if filename not in subagent_readers:
new_reader = _JsonlReader(jsonl_path)
if new_reader.open():
subagent_readers[filename] = new_reader
logger.debug(f"Opened subagent reader: {filename}")
def close_all_subagent_readers() -> None:
nonlocal subagent_readers
if subagent_readers:
logger.debug(f"Closing {len(subagent_readers)} subagent reader(s)")
for r in subagent_readers.values():
r.close()
subagent_readers = {}
async def match_with_fallback(msg_type: str, raw_cli: dict) -> dict | None:
"""Try main reader, fall back to all subagent readers if in subagent mode."""
jsonl_raw = None
if msg_type == 'user':
uuid = raw_cli.get('uuid')
if reader:
jsonl_raw = await reader.match_user(uuid)
if not jsonl_raw and in_subagent:
await open_new_subagent_readers()
for r in subagent_readers.values():
jsonl_raw = await r.match_user(uuid)
if jsonl_raw:
break
elif msg_type == 'assistant':
cli_content = raw_cli.get('message', {}).get('content', [])
if reader:
jsonl_raw = await reader.match_assistant(cli_content)
if not jsonl_raw and in_subagent:
await open_new_subagent_readers()
for r in subagent_readers.values():
jsonl_raw = await r.match_assistant(cli_content)
if jsonl_raw:
break
return jsonl_raw
try:
async for raw_cli in transport.read_messages():
msg_type = raw_cli.get('type')
try:
sdk_msg = parse_message(raw_cli)
except Exception:
continue
if msg_type == 'system' and raw_cli.get('subtype') == 'init':
session_id = raw_cli.get('session_id')
cwd = raw_cli.get('cwd')
if session_id and cwd:
jsonl_path = _find_jsonl_path(session_id, cwd)
for _ in range(50):
if jsonl_path.exists():
break
await asyncio.sleep(0.05)
reader = _JsonlReader(jsonl_path)
reader.open()
_attach_raw(sdk_msg, raw_cli)
elif msg_type == 'result':
_attach_raw(sdk_msg, raw_cli)
elif msg_type == 'assistant':
cli_content = raw_cli.get('message', {}).get('content', [])
# Check if this assistant message spawns a Task
task_id = _get_task_tool_use_id(cli_content)
if task_id:
pending_task_ids.add(task_id)
in_subagent = True
logger.debug(f"Entering subagent mode, task_id={task_id[:12]}...")
jsonl_raw = await match_with_fallback(msg_type, raw_cli)
_attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli)
elif msg_type == 'user':
cli_content = raw_cli.get('message', {}).get('content', [])
jsonl_raw = await match_with_fallback(msg_type, raw_cli)
_attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli)
# Track Task completion for subagent mode state
# NOTE: We do NOT close subagent readers on task completion.
# Subagents support "resume" - if closed, we'd lose file position
# and re-read old entries. Keeping readers open is cheap:
# - File handles: ~30 (ulimit typically 1024+)
# - Memory: completed subagents have empty queues
# All readers closed in finally block on session end.
if isinstance(cli_content, list):
result_ids = _get_tool_result_ids(cli_content)
completed = pending_task_ids & result_ids
if completed:
pending_task_ids -= completed
logger.debug(f"Task completed: {completed}")
if not pending_task_ids:
logger.debug("Exiting subagent mode (readers kept open for resume)")
in_subagent = False
# Don't close readers - subagents may be resumed
else:
_attach_raw(sdk_msg, raw_cli)
yield sdk_msg
finally:
if reader:
reader.close()
close_all_subagent_readers()
await transport.close()
__all__ = [
"ClaudeSDKClientWithRaw",
"query_with_raw",
"ClaudeAgentOptions",
"Message",
"UserMessage",
"AssistantMessage",
"SystemMessage",
"ResultMessage",
]
@oneryalcin
Copy link
Author

oneryalcin commented Jan 12, 2026

  ┌─────────────────────────┬──────────┬─────────────────────────┬─────────────────┐
  │          Field          │ In JSONL │         In SDK          │    With .raw    │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ uuid                    │ ✅       │ ✅ (UserMessage only)   │ ✅ All messages │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ parentUuid              │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ sessionId               │ ✅       │ ✅ (ResultMessage only) │ ✅ All messages │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ timestamp               │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ version                 │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ gitBranch               │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ cwd                     │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ isSidechain             │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ userType                │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ requestId               │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ message.id              │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ message.stop_reason     │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ message.usage (per-msg) │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ toolUseResult           │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ sourceToolAssistantUUID │ ✅       │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ modelUsage              │ CLI only │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ errors                  │ CLI only │ ❌                      │ ✅              │
  ├─────────────────────────┼──────────┼─────────────────────────┼─────────────────┤
  │ permission_denials      │ CLI only │ ❌                      │ ✅              │
  └─────────────────────────┴──────────┴─────────────────────────┴─────────────────┘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment