Last active
January 12, 2026 13:38
-
-
Save oneryalcin/0f0dc48cdc29dfcbebc4c6dcfe4cbf8f to your computer and use it in GitHub Desktop.
Claude Python SDK Monkey Patch for enriching messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| SDK Patch with Subagent Support - Adds .raw field to SDK message types with maximum fidelity. | |
| Provides drop-in replacements: | |
| - ClaudeSDKClientWithRaw: replaces ClaudeSDKClient | |
| - query_with_raw: replaces query | |
| Data sources for .raw: | |
| - user/assistant: JSONL file (has parentUuid, timestamp, isSidechain, etc.) | |
| - result: Raw CLI output (has modelUsage, errors, permission_denials) | |
| - system: Raw CLI output (already complete) | |
| Usage: | |
| from sdk_patch_with_subagents import ClaudeSDKClientWithRaw, ClaudeAgentOptions | |
| async with ClaudeSDKClientWithRaw(options) as client: | |
| await client.query("Your prompt") | |
| async for msg in client.receive_response(): | |
| print(msg.raw.get("parentUuid")) | |
| print(msg.raw.get("timestamp")) | |
| JSONL FILE STRUCTURE AND SUBAGENT LINKING | |
| ========================================= | |
| Claude Code stores conversation history in JSONL files. Understanding this structure | |
| is essential for correlating SDK messages with their rich metadata. | |
| 1. FILE LOCATIONS | |
| ----------------- | |
| Main session JSONL: | |
| ~/.claude/projects/<mangled-cwd>/<session_id>.jsonl | |
| Subagent JSONL (when Task tool spawns agents): | |
| ~/.claude/projects/<mangled-cwd>/<session_id>/subagents/agent-<agentId>.jsonl | |
| Where: | |
| - <mangled-cwd>: The working directory with "/" replaced by "-" | |
| Example: /private/tmp/myproject -> -private-tmp-myproject | |
| - <session_id>: UUID like "0cb45b7d-8211-46a6-858f-b0aeea392e7f" | |
| - <agentId>: Short ID like "acd4485" (7 chars) | |
| 2. MESSAGE TYPES IN JSONL | |
| ------------------------- | |
| Each line is a JSON object with a "type" field: | |
| - "queue-operation": Internal queue management (skip these) | |
| - "user": User/tool-result messages | |
| - "assistant": Claude's responses | |
| - "system": System events (init, etc.) - usually not in JSONL | |
| 3. KEY FIELDS FOR LINKING | |
| ------------------------- | |
| All messages have: | |
| - uuid: Unique identifier for this message | |
| - parentUuid: UUID of the message this responds to (builds conversation tree) | |
| - sessionId: Session this belongs to | |
| - timestamp: ISO-8601 timestamp | |
| - isSidechain: false for main conversation, true for subagents | |
| Subagent-specific: | |
| - agentId: Short ID identifying the subagent (e.g., "acd4485") | |
| Assistant messages additionally have: | |
| - message.id: Anthropic API message ID (e.g., "msg_01RJ3XzgQ2gzafb75h1q5jeP") | |
| - message.model: Model used (e.g., "claude-sonnet-4-5-20250929") | |
| - message.usage: Token usage stats | |
| - requestId: API request ID | |
| User messages with tool results have: | |
| - toolUseResult: Rich metadata about tool execution | |
| - toolUseResult.agentId: Links to subagent (when Task tool was used) | |
| 4. SUBAGENT LINKING MECHANISM | |
| ----------------------------- | |
| When the main agent uses the Task tool to spawn a subagent: | |
| MAIN SESSION (parent): | |
| Entry 1: assistant with tool_use for "Task" tool | |
| Entry 2: user with tool_result containing: | |
| { | |
| "toolUseResult": { | |
| "agentId": "acd4485", <- LINK TO SUBAGENT | |
| "status": "completed", | |
| "prompt": "original prompt..." | |
| } | |
| } | |
| SUBAGENT SESSION (child): | |
| File: <session_id>/subagents/agent-acd4485.jsonl | |
| Every entry has: | |
| - sessionId: SAME as parent (shared!) | |
| - agentId: "acd4485" | |
| - isSidechain: true | |
| Linking logic: | |
| Parent -> Child: toolUseResult.agentId matches child's agentId | |
| Child -> Parent: Same sessionId, isSidechain=true indicates subordinate | |
| 5. EXAMPLE CONVERSATION TREE | |
| ---------------------------- | |
| Main session JSONL: | |
| user (uuid=A, parentUuid=null) <- Initial prompt | |
| └── assistant (uuid=B, parentUuid=A) <- Uses Task tool | |
| └── user (uuid=C, parentUuid=B, toolUseResult.agentId="xyz123") <- Task result | |
| └── assistant (uuid=D, parentUuid=C) <- Final response | |
| Subagent JSONL (agent-xyz123.jsonl): | |
| user (uuid=X, parentUuid=null, agentId="xyz123", isSidechain=true) <- Subagent prompt | |
| └── assistant (uuid=Y, parentUuid=X, agentId="xyz123") <- Subagent uses Glob | |
| └── user (uuid=Z, parentUuid=Y, agentId="xyz123") <- Glob result | |
| └── assistant (uuid=W, parentUuid=Z, agentId="xyz123") <- Subagent response | |
| 6. WHY THIS MATTERS | |
| ------------------- | |
| The SDK's parsed messages lose most of this metadata. This patch preserves it via .raw: | |
| msg.raw.get("parentUuid") # Build conversation trees | |
| msg.raw.get("timestamp") # Precise timing | |
| msg.raw.get("message").get("usage") # Token usage per message | |
| msg.raw.get("toolUseResult") # Rich tool execution data | |
| msg.raw.get("agentId") # Identify subagent messages | |
| msg.raw.get("isSidechain") # Distinguish main vs subagent | |
| For subagent support, we watch for toolUseResult.agentId in tool_result messages, | |
| then read the corresponding subagent JSONL file to get its full conversation. | |
| 7. SDK VISIBILITY INTO SUBAGENTS (FOREGROUND vs BACKGROUND) | |
| ------------------------------------------------------------ | |
| The Task tool can run subagents in foreground (default) or background mode. | |
| This significantly affects what messages the SDK exposes. | |
| FOREGROUND (run_in_background=false, default): | |
| Message stream includes subagent internals: | |
| 1. SystemMessage init | |
| 2. AssistantMessage (main, sonnet) - decides to use Task | |
| 3. AssistantMessage (main) - ToolUseBlock name=Task | |
| 4. UserMessage - initial Task ack | |
| 5. AssistantMessage (subagent, haiku) - ToolUseBlock name=Bash <- SUBAGENT VISIBLE | |
| 6. UserMessage - ToolResultBlock from Bash <- SUBAGENT VISIBLE | |
| 7. UserMessage - Task final result with toolUseResult.agentId | |
| 8. AssistantMessage (main) - summarizes | |
| 9. ResultMessage | |
| You can identify subagent messages by: | |
| - Different model (e.g., haiku when main is sonnet) | |
| - msg.raw.get("isSidechain") == True | |
| - msg.raw.get("agentId") is set | |
| BACKGROUND (run_in_background=true): | |
| Subagent runs async, internals NOT in message stream: | |
| 1. SystemMessage init | |
| 2. AssistantMessage (main) - decides to use Task | |
| 3. AssistantMessage (main) - ToolUseBlock name=Task | |
| 4. UserMessage - "Async agent launched. agentId: xxx" <- IMMEDIATE RETURN | |
| 5. AssistantMessage (main) - may poll output file | |
| ... | |
| N. ResultMessage | |
| To get subagent data in background mode: | |
| - Read the subagent JSONL directly: <session_id>/subagents/agent-<agentId>.jsonl | |
| - Or use TaskOutput tool to retrieve results | |
| COMPARISON TABLE: | |
| | Aspect | Foreground | Background | | |
| |-----------------------------|------------|------------| | |
| | Subagent tool calls visible | Yes | No | | |
| | Subagent tool results visible | Yes | No | | |
| | Blocking | Yes | No | | |
| | Can identify via model | Yes | N/A | | |
| | JSONL has full history | Yes | Yes | | |
| Use foreground when: You need full visibility into subagent work | |
| Use background when: Long tasks, parallel execution, don't need live updates | |
| 8. DISTRIBUTED / MULTI-MACHINE RESUME | |
| -------------------------------------- | |
| This patch supports distributed session resume where follow-up questions may be | |
| handled by different machines (with JSONL files synced/copied between them). | |
| SCENARIO: | |
| Machine A: User asks question 1 → session abc-123 created | |
| Subagents spawn, JSONL files written | |
| Session ends, client disconnects | |
| Machine B: User asks follow-up question 2 | |
| Synced JSONL files from Machine A available | |
| Resumes session abc-123 with same session_id | |
| HOW IT WORKS: | |
| - Machine B opens JSONL files at position 0 (no saved file position) | |
| - Re-reads ALL old entries from Machine A into queue | |
| - Old entries have old UUIDs/signatures → won't match new CLI messages | |
| - Queue fills up → FIFO eviction drops oldest entries | |
| - New entries (from Machine B's CLI) appear at queue tail | |
| - New CLI messages match against new entries correctly | |
| PERFORMANCE CHARACTERISTICS: | |
| Typical session: ~100-300 main entries, ~10-50 per subagent, ~500KB total | |
| On machine switch: | |
| - Disk read: ~10-50ms (SSD) | |
| - JSON parse: ~10-50ms | |
| - Total startup overhead: ~50-100ms (acceptable) | |
| Per-message matching: | |
| - Queue: ~200 old entries + new entries | |
| - Each old entry: UUID/signature mismatch check is O(1) | |
| - Total: ~200 microseconds per message (negligible) | |
| WHEN THIS WOULD BE A PROBLEM (not typical): | |
| - Sessions with 10,000+ entries | |
| - Machine switches on every request | |
| - Multi-MB content per entry | |
| POTENTIAL OPTIMIZATION (not implemented, tracked for future): | |
| If distributed resume becomes a bottleneck, could add file position tracking: | |
| class ClaudeSDKClientWithRaw: | |
| def __init__(self, options, file_positions: dict[str, int] = None): | |
| # file_positions from shared store (Redis, DB) | |
| # On resume, seek to saved position instead of re-reading | |
| For now, the ~50-100ms overhead is acceptable for production use. | |
| 9. TESTED VERSIONS | |
| ------------------ | |
| This patch was developed and tested with: | |
| - Claude Code CLI: 2.1.5 | |
| - claude-agent-sdk (Python): 0.1.19 | |
| File format and linking behavior may differ in other versions. | |
| """ | |
| import asyncio | |
| import hashlib | |
| import json | |
| import logging | |
| import unicodedata | |
| from pathlib import Path | |
| from typing import AsyncIterator, Any, TextIO | |
| from claude_agent_sdk import ( | |
| ClaudeAgentOptions, | |
| Message, | |
| UserMessage, | |
| AssistantMessage, | |
| SystemMessage, | |
| ResultMessage, | |
| ClaudeSDKClient, | |
| ) | |
| from claude_agent_sdk._internal.message_parser import parse_message | |
| from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessCLITransport | |
| logger = logging.getLogger(__name__) | |
| # Queue holds unmatched JSONL entries during correlation. | |
| # Matched entries are popped immediately, so queue stays small in normal operation. | |
| # Only unmatched entries (e.g., initial prompt, queue-operation) accumulate. | |
| # The 500 cap is a safety net - if exceeded, oldest half is dropped (FIFO eviction). | |
| _MAX_QUEUE_SIZE = 500 | |
| # Retry settings for JSONL matching. | |
| # CLI stdout may arrive before JSONL is flushed to disk, especially under I/O load. | |
| # 20 retries × 100ms = 2 second window to handle slow disks, network FS, high load. | |
| # | |
| # REVIEW NOTE: 2 second window is tight under heavy I/O pressure (NFS, slow storage). | |
| # Monitor for "Failed to match after 20 retries" in production logs. | |
| # If frequent, consider increasing _MATCH_RETRIES or _MATCH_RETRY_DELAY. | |
| _MATCH_RETRIES = 20 | |
| _MATCH_RETRY_DELAY = 0.100 | |
| def _find_jsonl_path(session_id: str, cwd: str) -> Path: | |
| normalized = cwd.replace("\\", "/") | |
| mangled = normalized.replace("/", "-") | |
| return Path.home() / ".claude" / "projects" / mangled / f"{session_id}.jsonl" | |
| def _find_subagents_dir(session_id: str, cwd: str) -> Path: | |
| """Get path to subagents directory for a session.""" | |
| normalized = cwd.replace("\\", "/") | |
| mangled = normalized.replace("/", "-") | |
| return Path.home() / ".claude" / "projects" / mangled / session_id / "subagents" | |
| def _get_task_tool_use_id(content: list) -> str | None: | |
| """Get the tool_use id if content contains a Task tool_use block. | |
| REVIEW NOTE: Case-sensitive match on "Task". If CLI changes tool name | |
| to "task" or "TASK", detection will break. Monitor for CLI updates. | |
| """ | |
| for block in content: | |
| if isinstance(block, dict) and block.get("type") == "tool_use" and block.get("name") == "Task": | |
| return block.get("id") | |
| return None | |
| def _get_tool_result_ids(content: list) -> set[str]: | |
| """Get all tool_use_ids from tool_result blocks in content.""" | |
| ids = set() | |
| for block in content: | |
| if isinstance(block, dict) and block.get("type") == "tool_result": | |
| tool_use_id = block.get("tool_use_id") | |
| if tool_use_id: | |
| ids.add(tool_use_id) | |
| return ids | |
| def _stable_hash(s: str) -> str: | |
| return hashlib.sha256(s.encode('utf-8')).hexdigest()[:16] | |
| def _content_signature_raw(content: Any) -> str | None: | |
| """Create content signature for matching assistant messages. | |
| Uses SHA256 hash of first 200 chars for text/thinking blocks. | |
| Tool use/result blocks use their unique IDs directly. | |
| """ | |
| if not isinstance(content, list): | |
| return None | |
| parts = [] | |
| for block in content: | |
| if not isinstance(block, dict): | |
| continue | |
| btype = block.get("type") | |
| if btype == "text": | |
| text = block.get('text', '') | |
| if text: | |
| parts.append(f"text:{_stable_hash(unicodedata.normalize('NFC', str(text))[:200])}") | |
| elif btype == "tool_use": | |
| if block.get('id'): | |
| parts.append(f"tool_use:{block.get('id')}") | |
| elif btype == "thinking": | |
| thinking = block.get('thinking', '') | |
| if thinking: | |
| parts.append(f"thinking:{_stable_hash(unicodedata.normalize('NFC', str(thinking))[:200])}") | |
| elif btype == "tool_result": | |
| if block.get('tool_use_id'): | |
| parts.append(f"tool_result:{block.get('tool_use_id')}") | |
| return "|".join(parts) if parts else None | |
| class _JsonlReader: | |
| """Reads and queues JSONL entries for correlation matching. | |
| REVIEW NOTE: Uses synchronous file I/O (open, readline) in async context. | |
| For typical sessions (100-300 entries), this blocks event loop for ~1-10ms. | |
| Acceptable for current usage. For high-throughput scenarios (10k+ entries), | |
| consider wrapping with anyio.to_thread.run_sync() or using aiofiles. | |
| Memory footprint: ~725KB for 30 subagent readers with empty queues. | |
| """ | |
| def __init__(self, path: Path): | |
| self.path = path | |
| self.file: TextIO | None = None | |
| self.queue: list[dict[str, Any]] = [] | |
| self._incomplete_line: str = "" | |
| def open(self) -> bool: | |
| try: | |
| # REVIEW NOTE: Sync I/O - blocks event loop briefly | |
| self.file = open(self.path, encoding='utf-8') | |
| logger.debug(f"Opened JSONL reader: {self.path}") | |
| return True | |
| except (FileNotFoundError, PermissionError, OSError) as e: | |
| logger.debug(f"Failed to open JSONL: {self.path} ({e})") | |
| return False | |
| def close(self): | |
| if self.file: | |
| try: | |
| self.file.close() | |
| except Exception: | |
| pass | |
| self.file = None | |
| def read_pending(self): | |
| if not self.file: | |
| return | |
| try: | |
| while True: | |
| line = self.file.readline() | |
| if not line: | |
| break | |
| if not line.endswith('\n'): | |
| self._incomplete_line += line | |
| break | |
| if self._incomplete_line: | |
| line = self._incomplete_line + line | |
| self._incomplete_line = "" | |
| line = line.strip() | |
| if line: | |
| try: | |
| data = json.loads(line) | |
| # Only queue types we can match (user/assistant) | |
| # Skip queue-operation, system, etc. to avoid accumulation | |
| if data.get("type") in ("user", "assistant"): | |
| self.queue.append(data) | |
| except json.JSONDecodeError: | |
| pass | |
| except (IOError, OSError): | |
| self.file = None | |
| if len(self.queue) > _MAX_QUEUE_SIZE: | |
| dropped = len(self.queue) - _MAX_QUEUE_SIZE // 2 | |
| logger.warning(f"JSONL queue overflow, dropping {dropped} oldest entries") | |
| self.queue = self.queue[_MAX_QUEUE_SIZE // 2:] | |
| async def match_user(self, uuid: str | None) -> dict[str, Any] | None: | |
| if not uuid: | |
| return None | |
| for _ in range(_MATCH_RETRIES): | |
| self.read_pending() | |
| for i, raw in enumerate(self.queue): | |
| if raw.get("type") == "user" and str(raw.get("uuid")) == str(uuid): | |
| logger.debug(f"Matched user message uuid={uuid[:8]}...") | |
| return self.queue.pop(i) | |
| await asyncio.sleep(_MATCH_RETRY_DELAY) | |
| logger.debug(f"Failed to match user message uuid={uuid[:8]}... after {_MATCH_RETRIES} retries") | |
| return None | |
| async def match_assistant(self, cli_content: list | None) -> dict[str, Any] | None: | |
| if not cli_content: | |
| return None | |
| target_sig = _content_signature_raw(cli_content) | |
| if not target_sig: | |
| logger.debug("Cannot create signature for assistant content (empty or invalid)") | |
| return None | |
| for _ in range(_MATCH_RETRIES): | |
| self.read_pending() | |
| for i, raw in enumerate(self.queue): | |
| if raw.get("type") != "assistant": | |
| continue | |
| jsonl_content = raw.get("message", {}).get("content", []) | |
| if _content_signature_raw(jsonl_content) == target_sig: | |
| logger.debug(f"Matched assistant message sig={target_sig[:30]}...") | |
| return self.queue.pop(i) | |
| await asyncio.sleep(_MATCH_RETRY_DELAY) | |
| logger.debug(f"Failed to match assistant message sig={target_sig[:30]}... after {_MATCH_RETRIES} retries") | |
| return None | |
| def _attach_raw(msg: Message, raw: dict[str, Any] | None) -> Message: | |
| try: | |
| msg.raw = raw # type: ignore[attr-defined] | |
| except AttributeError: | |
| pass | |
| return msg | |
| class ClaudeSDKClientWithRaw(ClaudeSDKClient): | |
| """ | |
| Drop-in replacement for ClaudeSDKClient with .raw field on messages. | |
| Supports subagent messages: when Task tool spawns a subagent in foreground mode, | |
| subagent messages are correlated with their JSONL for full metadata (agentId, | |
| isSidechain, parentUuid, etc.). | |
| Usage: | |
| async with ClaudeSDKClientWithRaw(options) as client: | |
| await client.query("Hello") | |
| async for msg in client.receive_response(): | |
| # Normal SDK fields | |
| if isinstance(msg, AssistantMessage): | |
| print(msg.model) | |
| # Rich .raw data (works for both main and subagent messages) | |
| print(msg.raw.get("parentUuid")) | |
| print(msg.raw.get("timestamp")) | |
| print(msg.raw.get("agentId")) # Set for subagent messages | |
| print(msg.raw.get("isSidechain")) # True for subagent messages | |
| """ | |
| def __init__(self, options: ClaudeAgentOptions | None = None): | |
| super().__init__(options) | |
| self._jsonl_reader: _JsonlReader | None = None | |
| self._subagent_readers: dict[str, _JsonlReader] = {} # filename -> reader | |
| self._session_id: str | None = None | |
| self._cwd: str | None = None | |
| self._in_subagent: bool = False | |
| self._pending_task_ids: set[str] = set() # Track active Task tool_use_ids | |
| async def _open_new_subagent_readers(self) -> None: | |
| """Open any new subagent JSONL files that haven't been opened yet.""" | |
| if not self._session_id or not self._cwd: | |
| return | |
| subagents_dir = _find_subagents_dir(self._session_id, self._cwd) | |
| if not subagents_dir.exists(): | |
| return | |
| # Find all subagent JSONL files | |
| for jsonl_path in subagents_dir.glob("agent-*.jsonl"): | |
| filename = jsonl_path.name | |
| if filename not in self._subagent_readers: | |
| reader = _JsonlReader(jsonl_path) | |
| if reader.open(): | |
| self._subagent_readers[filename] = reader | |
| logger.debug(f"Opened subagent reader: {filename}") | |
| def _close_all_subagent_readers(self) -> None: | |
| """Close all subagent readers.""" | |
| if self._subagent_readers: | |
| logger.debug(f"Closing {len(self._subagent_readers)} subagent reader(s)") | |
| for reader in self._subagent_readers.values(): | |
| reader.close() | |
| self._subagent_readers.clear() | |
| async def _match_with_fallback( | |
| self, msg_type: str, raw_cli: dict[str, Any] | |
| ) -> dict[str, Any] | None: | |
| """Try main reader, fall back to all subagent readers if in subagent mode.""" | |
| jsonl_raw = None | |
| if msg_type == 'user': | |
| uuid = raw_cli.get('uuid') | |
| # Try main first | |
| if self._jsonl_reader: | |
| jsonl_raw = await self._jsonl_reader.match_user(uuid) | |
| # If in subagent mode and no match, try all subagent readers | |
| if not jsonl_raw and self._in_subagent: | |
| await self._open_new_subagent_readers() | |
| for reader in self._subagent_readers.values(): | |
| jsonl_raw = await reader.match_user(uuid) | |
| if jsonl_raw: | |
| break | |
| elif msg_type == 'assistant': | |
| cli_content = raw_cli.get('message', {}).get('content', []) | |
| # Try main first | |
| if self._jsonl_reader: | |
| jsonl_raw = await self._jsonl_reader.match_assistant(cli_content) | |
| # If in subagent mode and no match, try all subagent readers | |
| if not jsonl_raw and self._in_subagent: | |
| await self._open_new_subagent_readers() | |
| for reader in self._subagent_readers.values(): | |
| jsonl_raw = await reader.match_assistant(cli_content) | |
| if jsonl_raw: | |
| break | |
| return jsonl_raw | |
| async def receive_messages(self) -> AsyncIterator[Message]: | |
| """Receive messages with .raw attached (including subagent messages).""" | |
| if not self._query: | |
| from claude_agent_sdk._errors import CLIConnectionError | |
| raise CLIConnectionError("Not connected. Call connect() first.") | |
| async for raw_cli in self._query.receive_messages(): | |
| msg_type = raw_cli.get('type') | |
| # Parse to SDK type | |
| try: | |
| sdk_msg = parse_message(raw_cli) | |
| except Exception: | |
| continue | |
| # Get session info from system init | |
| if msg_type == 'system' and raw_cli.get('subtype') == 'init': | |
| self._session_id = raw_cli.get('session_id') | |
| self._cwd = raw_cli.get('cwd') | |
| if self._session_id and self._cwd: | |
| jsonl_path = _find_jsonl_path(self._session_id, self._cwd) | |
| # Wait for file | |
| for _ in range(50): | |
| if jsonl_path.exists(): | |
| break | |
| await asyncio.sleep(0.05) | |
| self._jsonl_reader = _JsonlReader(jsonl_path) | |
| self._jsonl_reader.open() | |
| _attach_raw(sdk_msg, raw_cli) | |
| elif msg_type == 'result': | |
| # Result: use CLI (has modelUsage, errors) | |
| _attach_raw(sdk_msg, raw_cli) | |
| elif msg_type == 'assistant': | |
| cli_content = raw_cli.get('message', {}).get('content', []) | |
| # Check if this assistant message spawns a Task (enter subagent mode) | |
| task_id = _get_task_tool_use_id(cli_content) | |
| if task_id: | |
| self._pending_task_ids.add(task_id) | |
| self._in_subagent = True | |
| logger.debug(f"Entering subagent mode, task_id={task_id[:12]}...") | |
| jsonl_raw = await self._match_with_fallback(msg_type, raw_cli) | |
| _attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli) | |
| elif msg_type == 'user': | |
| cli_content = raw_cli.get('message', {}).get('content', []) | |
| jsonl_raw = await self._match_with_fallback(msg_type, raw_cli) | |
| _attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli) | |
| # Track Task completion for subagent mode state | |
| # NOTE: We intentionally do NOT close subagent readers on task completion. | |
| # Reason: Subagents support "resume" - the main agent can ask follow-up | |
| # questions to a completed subagent. When resumed, new entries are appended | |
| # to the same agent-<id>.jsonl file. If we closed the reader, we'd lose | |
| # the file position and re-read old entries on reopen. | |
| # | |
| # Cost of keeping readers open: | |
| # - File handles: ~30 for long session (cheap, ulimit typically 1024+) | |
| # - Memory: completed subagents have empty queues (O(1) per reader) | |
| # - Scan time: empty queues short-circuit quickly | |
| # | |
| # All readers are closed on session disconnect. | |
| if isinstance(cli_content, list): | |
| result_ids = _get_tool_result_ids(cli_content) | |
| completed = self._pending_task_ids & result_ids | |
| if completed: | |
| self._pending_task_ids -= completed | |
| logger.debug(f"Task completed: {completed}") | |
| if not self._pending_task_ids: | |
| logger.debug("Exiting subagent mode (readers kept open for resume)") | |
| self._in_subagent = False | |
| # Don't close readers here - subagents may be resumed later | |
| else: | |
| _attach_raw(sdk_msg, raw_cli) | |
| yield sdk_msg | |
| async def disconnect(self) -> None: | |
| """Disconnect and cleanup.""" | |
| if self._jsonl_reader: | |
| self._jsonl_reader.close() | |
| self._jsonl_reader = None | |
| self._close_all_subagent_readers() | |
| await super().disconnect() | |
| # Also provide query_with_raw for simple one-shot usage | |
| # | |
| # REVIEW NOTE: Code duplication between ClaudeSDKClientWithRaw and query_with_raw. | |
| # Both implement similar logic for: | |
| # - _match_with_fallback / match_with_fallback | |
| # - _open_new_subagent_readers / open_new_subagent_readers | |
| # - Task tool_use tracking and subagent state machine | |
| # | |
| # This duplication means bug fixes must be applied twice. Future refactoring could | |
| # extract common logic to a shared _RawCorrelator helper class. Tracked for later. | |
| # | |
| async def query_with_raw( | |
| prompt: str, | |
| options: ClaudeAgentOptions | None = None, | |
| ) -> AsyncIterator[Message]: | |
| """ | |
| One-shot query with .raw attached to messages. | |
| Drop-in replacement for claude_agent_sdk.query(). | |
| Supports subagent messages with full metadata. | |
| """ | |
| options = options or ClaudeAgentOptions() | |
| transport = SubprocessCLITransport(prompt, options) | |
| await transport.connect() | |
| session_id: str | None = None | |
| cwd: str | None = None | |
| reader: _JsonlReader | None = None | |
| subagent_readers: dict[str, _JsonlReader] = {} | |
| in_subagent: bool = False | |
| pending_task_ids: set[str] = set() | |
| async def open_new_subagent_readers() -> None: | |
| """Open any new subagent JSONL files that haven't been opened yet.""" | |
| nonlocal subagent_readers | |
| if not session_id or not cwd: | |
| return | |
| subagents_dir = _find_subagents_dir(session_id, cwd) | |
| if not subagents_dir.exists(): | |
| return | |
| for jsonl_path in subagents_dir.glob("agent-*.jsonl"): | |
| filename = jsonl_path.name | |
| if filename not in subagent_readers: | |
| new_reader = _JsonlReader(jsonl_path) | |
| if new_reader.open(): | |
| subagent_readers[filename] = new_reader | |
| logger.debug(f"Opened subagent reader: {filename}") | |
| def close_all_subagent_readers() -> None: | |
| nonlocal subagent_readers | |
| if subagent_readers: | |
| logger.debug(f"Closing {len(subagent_readers)} subagent reader(s)") | |
| for r in subagent_readers.values(): | |
| r.close() | |
| subagent_readers = {} | |
| async def match_with_fallback(msg_type: str, raw_cli: dict) -> dict | None: | |
| """Try main reader, fall back to all subagent readers if in subagent mode.""" | |
| jsonl_raw = None | |
| if msg_type == 'user': | |
| uuid = raw_cli.get('uuid') | |
| if reader: | |
| jsonl_raw = await reader.match_user(uuid) | |
| if not jsonl_raw and in_subagent: | |
| await open_new_subagent_readers() | |
| for r in subagent_readers.values(): | |
| jsonl_raw = await r.match_user(uuid) | |
| if jsonl_raw: | |
| break | |
| elif msg_type == 'assistant': | |
| cli_content = raw_cli.get('message', {}).get('content', []) | |
| if reader: | |
| jsonl_raw = await reader.match_assistant(cli_content) | |
| if not jsonl_raw and in_subagent: | |
| await open_new_subagent_readers() | |
| for r in subagent_readers.values(): | |
| jsonl_raw = await r.match_assistant(cli_content) | |
| if jsonl_raw: | |
| break | |
| return jsonl_raw | |
| try: | |
| async for raw_cli in transport.read_messages(): | |
| msg_type = raw_cli.get('type') | |
| try: | |
| sdk_msg = parse_message(raw_cli) | |
| except Exception: | |
| continue | |
| if msg_type == 'system' and raw_cli.get('subtype') == 'init': | |
| session_id = raw_cli.get('session_id') | |
| cwd = raw_cli.get('cwd') | |
| if session_id and cwd: | |
| jsonl_path = _find_jsonl_path(session_id, cwd) | |
| for _ in range(50): | |
| if jsonl_path.exists(): | |
| break | |
| await asyncio.sleep(0.05) | |
| reader = _JsonlReader(jsonl_path) | |
| reader.open() | |
| _attach_raw(sdk_msg, raw_cli) | |
| elif msg_type == 'result': | |
| _attach_raw(sdk_msg, raw_cli) | |
| elif msg_type == 'assistant': | |
| cli_content = raw_cli.get('message', {}).get('content', []) | |
| # Check if this assistant message spawns a Task | |
| task_id = _get_task_tool_use_id(cli_content) | |
| if task_id: | |
| pending_task_ids.add(task_id) | |
| in_subagent = True | |
| logger.debug(f"Entering subagent mode, task_id={task_id[:12]}...") | |
| jsonl_raw = await match_with_fallback(msg_type, raw_cli) | |
| _attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli) | |
| elif msg_type == 'user': | |
| cli_content = raw_cli.get('message', {}).get('content', []) | |
| jsonl_raw = await match_with_fallback(msg_type, raw_cli) | |
| _attach_raw(sdk_msg, jsonl_raw if jsonl_raw else raw_cli) | |
| # Track Task completion for subagent mode state | |
| # NOTE: We do NOT close subagent readers on task completion. | |
| # Subagents support "resume" - if closed, we'd lose file position | |
| # and re-read old entries. Keeping readers open is cheap: | |
| # - File handles: ~30 (ulimit typically 1024+) | |
| # - Memory: completed subagents have empty queues | |
| # All readers closed in finally block on session end. | |
| if isinstance(cli_content, list): | |
| result_ids = _get_tool_result_ids(cli_content) | |
| completed = pending_task_ids & result_ids | |
| if completed: | |
| pending_task_ids -= completed | |
| logger.debug(f"Task completed: {completed}") | |
| if not pending_task_ids: | |
| logger.debug("Exiting subagent mode (readers kept open for resume)") | |
| in_subagent = False | |
| # Don't close readers - subagents may be resumed | |
| else: | |
| _attach_raw(sdk_msg, raw_cli) | |
| yield sdk_msg | |
| finally: | |
| if reader: | |
| reader.close() | |
| close_all_subagent_readers() | |
| await transport.close() | |
| __all__ = [ | |
| "ClaudeSDKClientWithRaw", | |
| "query_with_raw", | |
| "ClaudeAgentOptions", | |
| "Message", | |
| "UserMessage", | |
| "AssistantMessage", | |
| "SystemMessage", | |
| "ResultMessage", | |
| ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.