Skip to content

Instantly share code, notes, and snippets.

@ben-vargas
Created September 19, 2025 04:05
Show Gist options
  • Select an option

  • Save ben-vargas/49d378ac39d27f5bdbc1c62cc4511981 to your computer and use it in GitHub Desktop.

Select an option

Save ben-vargas/49d378ac39d27f5bdbc1c62cc4511981 to your computer and use it in GitHub Desktop.
Tool Streaming Support for AI SDK Provider Codex CLI

Tool Streaming Support for AI SDK Provider Codex CLI

Issue Summary

GitHub Issue: #2 - Streaming tool calls? Requested by: @hbmartin Date: January 2025

The user is requesting support for streaming tool calls in streamText and streamObject, similar to the functionality planned for the Claude Code provider. This would enable building UIs that show tool calls and results as they happen.

Important Note on JSON Mode

Codex CLI's --json flag fundamentally changes the output behavior:

  • In JSON mode, AgentMessageDelta events are suppressed - no streaming text
  • The output begins with non-JSON config summary and prompt lines from EventProcessorWithJsonOutput
  • Tool events are still emitted as JSON events
  • The doGenerate method cannot access streaming events (only final task_complete)

Current State Analysis

What Currently Works

The provider successfully implements streaming for:

  • Session configuration (session_configured)
  • Task completion (task_complete)
  • Response metadata and session information
  • Final text output (non-streaming, sent all at once)

What's Missing

Tool-related streaming events are not captured or emitted, even though Codex CLI is emitting them. The user only sees:

  • stream-start
  • response-metadata (when session configured)
  • text-delta (final text only)
  • finish

Missing events that should be emitted:

  • tool-input-start
  • tool-input-delta
  • tool-input-end
  • tool-call
  • tool-result

Technical Investigation

Codex CLI Event System Capabilities

The Codex CLI provides comprehensive event streaming through its EventMsg enum. Important: Events are serialized with snake_case names due to #[strum(serialize_all = "snake_case")]:

// Event names in JSON will be: mcp_tool_call_begin, exec_command_begin, etc.
pub enum EventMsg {
    // Tool-related events
    McpToolCallBegin(McpToolCallBeginEvent),
    McpToolCallEnd(McpToolCallEndEvent),

    ExecCommandBegin(ExecCommandBeginEvent),
    ExecCommandOutputDelta(ExecCommandOutputDeltaEvent),
    ExecCommandEnd(ExecCommandEndEvent),

    WebSearchBegin(WebSearchBeginEvent),
    WebSearchEnd(WebSearchEndEvent),

    PatchApplyBegin(PatchApplyBeginEvent),
    PatchApplyEnd(PatchApplyEndEvent),

    ExecApprovalRequest(ExecApprovalRequestEvent),
    ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent),
    // ... and more
}

Each event has specific payload structures:

  • All events include call_id: Required for correlating begin/end events (never generate random IDs!)
  • McpToolCallBegin: Contains invocation: {server, tool, arguments}
  • ExecCommandBegin: Contains command, cwd, parsed_cmd
  • WebSearchBegin: Only has call_id (query comes in WebSearchEnd)
  • PatchApplyBegin: Contains auto_approved, changes (not file_changes)
  • ExecCommandOutputDelta: Contains Base64-encoded chunk and stream enum

Current Provider Implementation Gaps

Looking at src/codex-cli-language-model.ts:

// Current implementation only captures 2 events:
child.stdout.on('data', (chunk: string) => {
  const lines = chunk.split(/\r?\n/).filter(Boolean);
  for (const line of lines) {
    const evt = this.parseJsonLine(line);
    if (!evt) continue;
    const msg = evt.msg;
    const type = msg?.type;
    if (type === 'session_configured' && msg) {
      this.sessionId = msg.session_id;
    } else if (type === 'task_complete' && msg) {
      const last = msg.last_agent_message;
      if (typeof last === 'string') text = last;
    }
    // ALL OTHER EVENTS ARE IGNORED!
  }
});

AI SDK v5 Requirements

The AI SDK expects these LanguageModelV2StreamPart types for tools:

type ToolStreamParts =
  | { type: 'tool-input-start'; id: string; toolName: string; }
  | { type: 'tool-input-delta'; id: string; delta: string; }
  | { type: 'tool-input-end'; id: string; }
  | { type: 'tool-call'; toolCallId: string; toolName: string; input: string; providerExecuted?: boolean; }
  | { type: 'tool-result'; toolCallId: string; result: unknown; }

Critical Implementation Considerations

Provider-Executed Tools

IMPORTANT: Codex CLI's tools (exec, patch, web search, MCP tools) are executed by Codex CLI itself, NOT by the AI SDK's tool runner. We MUST set providerExecuted: true on all tool-call events to prevent the AI SDK from attempting to re-execute these tools. This is identical to the Claude Code provider's requirement.

Tool Type Mapping

Codex CLI has different tool types than Claude Code:

Codex Event Type (JSON) Tool Name Description
mcp_tool_call_begin/end invocation.tool MCP server tools
exec_command_begin/end "exec" Command execution
web_search_begin/end "web_search" Web search
patch_apply_begin/end "patch" Code modifications
exec_approval_request "exec_approval" Approval requests
apply_patch_approval_request "patch_approval" Patch approval

Implementation Plan

Phase 1: Core Tool Streaming Support

File: src/codex-cli-language-model.ts

Step 1: Add Event Type Definitions and Preamble Handler

// Add near line 29
interface CodexExecCommandBeginEvent {
  call_id: string;
  command: string[];
  cwd: string;
  parsed_cmd: any[];
}

interface CodexExecCommandOutputDeltaEvent {
  call_id: string;
  stream: 'stdout' | 'stderr';
  chunk: string; // Base64-encoded bytes
}

interface CodexPatchApplyBeginEvent {
  call_id: string;
  auto_approved: boolean;
  changes: Record<string, unknown>; // Note: 'changes' not 'file_changes'
}

interface CodexMcpToolCallBeginEvent {
  call_id: string;
  invocation: {
    server: string;
    tool: string;
    arguments?: unknown;
  };
}

interface CodexWebSearchEndEvent {
  call_id: string;
  query: string;
}

// Helper to skip JSON preamble lines
private isPreambleLine(line: string): boolean {
  // Config summary and prompt lines from EventProcessorWithJsonOutput
  try {
    const parsed = JSON.parse(line);
    return !parsed.id || !parsed.msg; // Not an event if missing id/msg
  } catch {
    return true; // Non-JSON lines are preamble
  }
}

Step 2: Add Tool Event Detection Helper

// Add around line 160
private extractToolName(eventType: string, event: any): string {
  switch(eventType) {
    case 'mcp_tool_call_begin':
      return event.invocation?.tool || 'mcp_tool';
    case 'exec_command_begin':
      return 'exec';
    case 'web_search_begin':
    case 'web_search_end':
      return 'web_search';
    case 'patch_apply_begin':
      return 'patch';
    default:
      return eventType.replace(/_begin$|_end$/, '').replace(/_/g, '-');
  }
}

private isToolBeginEvent(type: string): boolean {
  return ['mcp_tool_call_begin', 'exec_command_begin', 'web_search_begin', 'patch_apply_begin'].includes(type);
}

private isToolEndEvent(type: string): boolean {
  return ['mcp_tool_call_end', 'exec_command_end', 'web_search_end', 'patch_apply_end'].includes(type);
}

private decodeBase64Chunk(chunk: string): string {
  try {
    return Buffer.from(chunk, 'base64').toString('utf-8');
  } catch {
    return ''; // Invalid base64, return empty string
  }
}

Step 3: Modify Stream Processing (doStream method, lines 343-365)

// Replace the current stdout.on('data') handler with:
const pendingWebSearches = new Map<string, any>(); // Track web searches waiting for query

child.stdout.on('data', (chunk: string) => {
  const lines = chunk.split(/\r?\n/).filter(Boolean);
  for (const line of lines) {
    // Skip preamble lines (config summary, prompt)
    if (this.isPreambleLine(line)) continue;

    const evt = this.parseJsonLine(line);
    if (!evt || !evt.msg) continue;
    const msg = evt.msg;
    const type = msg?.type;

    if (type === 'session_configured' && msg) {
      this.sessionId = msg.session_id;
      controller.enqueue({
        type: 'response-metadata',
        id: randomUUID(),
        timestamp: new Date(),
        modelId: this.modelId,
      });
    }
    else if (this.isToolBeginEvent(type) && msg) {
      const toolName = this.extractToolName(type, msg);
      const toolId = msg.call_id; // ALWAYS use call_id from Codex

      if (!toolId) {
        console.warn(`Missing call_id for ${type} event`);
        continue;
      }

      // Special handling for web_search_begin (no query yet)
      if (type === 'web_search_begin') {
        pendingWebSearches.set(toolId, { toolName });
        // Don't emit events yet, wait for web_search_end with query
        continue;
      }

      // Emit tool-input-start
      controller.enqueue({
        type: 'tool-input-start',
        id: toolId,
        toolName: toolName,
      });

      // Prepare input data based on event type
      let inputData: any = {};
      if (type === 'exec_command_begin') {
        inputData = {
          command: msg.command,
          cwd: msg.cwd,
          parsed_cmd: msg.parsed_cmd
        };
      } else if (type === 'patch_apply_begin') {
        inputData = {
          changes: msg.changes, // Note: 'changes' not 'file_changes'
          auto_approved: msg.auto_approved
        };
      } else if (type === 'mcp_tool_call_begin') {
        inputData = {
          server: msg.invocation?.server,
          tool: msg.invocation?.tool,
          arguments: msg.invocation?.arguments
        };
      }

      const inputString = JSON.stringify(inputData);

      // Emit tool-input-delta
      controller.enqueue({
        type: 'tool-input-delta',
        id: toolId,
        delta: inputString,
      });

      // Emit tool-input-end
      controller.enqueue({
        type: 'tool-input-end',
        id: toolId,
      });

      // Emit tool-call with providerExecuted flag
      controller.enqueue({
        type: 'tool-call',
        toolCallId: toolId,
        toolName: toolName,
        input: inputString,
        providerExecuted: true, // CRITICAL: Codex executes its own tools
      });
    }
    else if (type === 'exec_command_output_delta' && msg) {
      const toolId = msg.call_id;
      if (!toolId) continue;

      // Decode Base64 chunk
      const decodedOutput = this.decodeBase64Chunk(msg.chunk);

      // Stream command output as partial results
      controller.enqueue({
        type: 'tool-result',
        toolCallId: toolId,
        result: {
          type: 'output-delta',
          stream: msg.stream, // 'stdout' or 'stderr'
          output: decodedOutput
        },
        providerMetadata: {
          'codex-cli': {
            isPartial: true,
            stream: msg.stream
          }
        }
      });
    }
    else if (type === 'web_search_end' && msg) {
      const toolId = msg.call_id;
      const pending = pendingWebSearches.get(toolId);

      if (pending) {
        // Now we have the query, emit all events
        controller.enqueue({
          type: 'tool-input-start',
          id: toolId,
          toolName: 'web_search',
        });

        const inputString = JSON.stringify({ query: msg.query });

        controller.enqueue({
          type: 'tool-input-delta',
          id: toolId,
          delta: inputString,
        });

        controller.enqueue({
          type: 'tool-input-end',
          id: toolId,
        });

        controller.enqueue({
          type: 'tool-call',
          toolCallId: toolId,
          toolName: 'web_search',
          input: inputString,
          providerExecuted: true,
        });

        pendingWebSearches.delete(toolId);
      }

      // Also emit the result
      controller.enqueue({
        type: 'tool-result',
        toolCallId: toolId,
        result: { query: msg.query, success: true },
      });
    }
    else if (this.isToolEndEvent(type) && msg && type !== 'web_search_end') {
      const toolId = msg.call_id;
      if (!toolId) continue;

      // Handle different result formats
      let result: any = { success: true };

      if (type === 'exec_command_end') {
        result = {
          stdout: msg.stdout,
          stderr: msg.stderr,
          aggregated_output: msg.aggregated_output,
          exit_code: msg.exit_code
        };
      } else if (type === 'patch_apply_end') {
        result = {
          stdout: msg.stdout,
          stderr: msg.stderr,
          success: msg.success
        };
      } else if (type === 'mcp_tool_call_end' && msg.result) {
        // Handle Result<CallToolResult, String> from Rust
        if (msg.result.Ok) {
          result = msg.result.Ok;
        } else if (msg.result.Err) {
          result = { error: msg.result.Err, success: false };
        }
      }

      controller.enqueue({
        type: 'tool-result',
        toolCallId: toolId,
        result: result,
        providerMetadata: {
          'codex-cli': {
            eventType: type
          }
        }
      });
    }
    else if (type === 'exec_approval_request' || type === 'apply_patch_approval_request') {
      // Handle approval requests as metadata, not tool calls
      // These share call_id with the subsequent exec_command_begin/patch_apply_begin
      const toolId = msg.call_id;
      if (!toolId) continue;

      // Emit as response-metadata to avoid call_id collision
      controller.enqueue({
        type: 'response-metadata',
        id: randomUUID(),
        timestamp: new Date(),
        modelId: this.modelId,
        providerMetadata: {
          'codex-cli': {
            approvalRequest: {
              type: type,
              call_id: toolId,
              details: msg
            }
          }
        }
      });
    }
    else if (type === 'agent_message_delta' && msg) {
      // Note: Suppressed in JSON mode
      const text = msg.content || msg.text || '';
      if (text) {
        if (!textPartId) {
          textPartId = randomUUID();
          controller.enqueue({ type: 'text-start', id: textPartId });
        }
        controller.enqueue({
          type: 'text-delta',
          id: textPartId,
          delta: text
        });
      }
    }
    else if (type === 'task_complete' && msg) {
      const last = msg.last_agent_message;
      if (typeof last === 'string') {
        accumulatedText = last;
      }
    }
  }
});

Step 4: Update doGenerate Method

Important limitation: In JSON mode (exec --json), Codex only provides the final task_complete event - there's no stream to capture intermediate tool events. Tool information would need to be extracted from the final output or response metadata if available. Consider:

// In doGenerate method
// Tool events are NOT available in non-streaming mode
// Only the final task_complete message is available
// Consider documenting this limitation or always using streaming internally

Phase 2: Enhanced Features

Command Output Streaming

Command output streaming is already implemented above with proper Base64 decoding:

  • exec_command_output_delta events contain Base64-encoded chunks
  • The stream field indicates whether it's stdout or stderr
  • Decoded output is sent as partial tool-result events

Error Handling for Tool Failures

else if (this.isToolEndEvent(type) && msg && msg.error) {
  controller.enqueue({
    type: 'tool-result',
    toolCallId: msg.call_id,
    result: {
      type: 'error',
      error: msg.error,
      message: msg.error_message || 'Tool execution failed'
    },
    providerMetadata: {
      'codex-cli': {
        isError: true,
        eventType: type
      }
    }
  });
}

Phase 3: Additional Considerations

Handling Multiple Begin/End Pairs

Retry loops may emit multiple begin/end pairs for the same logical action:

// Track retry attempts
const retryCounters = new Map<string, number>();

if (this.isToolBeginEvent(type)) {
  const count = retryCounters.get(toolId) || 0;
  retryCounters.set(toolId, count + 1);
  // Include retry count in metadata
}

JSON Preamble Handling

The first lines of output are non-JSON config summary and prompt from EventProcessorWithJsonOutput:

// Example preamble lines (not valid events):
// {"model":"gpt-5-codex","temperature":"0.5"}
// {"prompt":"List files in current directory"}
// Followed by actual events with id and msg fields

Phase 4: Testing & Validation

Test Script 1: Basic Tool Streaming

// examples/streaming-with-tools.ts
import { streamText } from 'ai';
import { codexCli } from '../dist/index.js';

async function main() {
  const result = streamText({
    model: codexCli('gpt-5-codex'),
    prompt: 'List the files in the current directory and show their sizes',
  });

  const toolCalls: any[] = [];

  for await (const part of result.fullStream) {
    switch(part.type) {
      case 'tool-input-start':
        console.log(`πŸ”§ Starting tool: ${part.toolName}`);
        break;
      case 'tool-input-delta':
        console.log(`   Input: ${part.delta}`);
        break;
      case 'tool-call':
        console.log(`βœ… Tool called: ${part.toolName} (${part.toolCallId})`);
        toolCalls.push({ name: part.toolName, id: part.toolCallId });
        break;
      case 'tool-result':
        console.log(`πŸ“Š Tool result for ${part.toolCallId}:`,
          JSON.stringify(part.result).substring(0, 100));
        break;
      case 'text-delta':
        process.stdout.write(part.delta);
        break;
    }
  }

  console.log('\n\nTool calls made:', toolCalls);
}

main().catch(console.error);

Test Script 2: Multiple Tool Types

// examples/streaming-multiple-tool-types.ts
async function testMultipleTools() {
  const result = streamText({
    model: codexCli('gpt-5-codex'),
    prompt: 'Search for information about TypeScript, then create a hello.ts file with a simple example',
  });

  const toolsByType: Record<string, number> = {};

  for await (const part of result.fullStream) {
    if (part.type === 'tool-call') {
      toolsByType[part.toolName] = (toolsByType[part.toolName] || 0) + 1;
    }
  }

  console.log('\nTool usage summary:', toolsByType);
  // Expected: { web_search: 1, patch: 1 }
}

Test Script 3: Command Execution with Output

// examples/streaming-exec-output.ts
async function testCommandOutput() {
  const result = streamText({
    model: codexCli('gpt-5-codex'),
    prompt: 'Run npm test and show me the results',
  });

  let currentToolCall: string | undefined;

  for await (const part of result.fullStream) {
    if (part.type === 'tool-call' && part.toolName === 'exec') {
      currentToolCall = part.toolCallId;
      console.log('πŸ“Ÿ Executing command...');
    }
    if (part.type === 'tool-result' && part.toolCallId === currentToolCall) {
      // Stream command output - note the correct structure
      if (part.result?.type === 'output-delta') {
        // Output is in part.result.output, not stdout/stderr
        const output = part.result.output || '';
        const stream = part.result.stream; // 'stdout' or 'stderr'

        if (stream === 'stderr') {
          process.stderr.write(output);
        } else {
          process.stdout.write(output);
        }
      }
    }
  }
}

Phase 4: Edge Cases & Error Handling

  1. Handle missing call_id: Log warning and skip the malformed event (never generate fake IDs)
  2. Handle tool execution errors: Properly emit error results
  3. Handle interrupted streams: Clean up partial tool calls
  4. Handle unknown tool types: Gracefully ignore or log
  5. Prevent double execution: MUST set providerExecuted: true on all tool calls

Phase 5: Key Differences from Original Plan

Based on ChatGPT's review, the following corrections were critical:

  1. Event names are snake_case, not PascalCase (due to #[strum(serialize_all = "snake_case")])
  2. MCP tool structure: invocation.server, invocation.tool, invocation.arguments (not flat structure)
  3. WebSearch query timing: Query only available in web_search_end, not web_search_begin
  4. call_id is required: Never generate random IDs - Codex always provides call_id
  5. Base64 decoding required: exec_command_output_delta chunks are Base64-encoded
  6. JSON mode limitations: No agent_message_delta events, preamble lines need handling
  7. Correct field names: changes not file_changes, parsed_cmd included in exec events

Phase 6: Documentation

Update README.md with:

## Tool Streaming Support

The provider now supports streaming of tool calls, enabling real-time UI updates as Codex executes commands, searches, and modifies files:

```typescript
const result = streamText({
  model: codexCli('gpt-5-codex'),
  prompt: 'Analyze and fix any TypeScript errors in the project',
});

for await (const part of result.fullStream) {
  switch(part.type) {
    case 'tool-input-start':
      // Tool execution starting
      console.log(`Starting: ${part.toolName}`);
      break;
    case 'tool-call':
      // Tool has been invoked
      console.log(`Executing: ${part.toolName}`);
      break;
    case 'tool-result':
      // Tool execution completed
      console.log(`Result:`, part.result);
      break;
    // ... handle other stream parts
  }
}

Supported Tool Types

  • exec: Command execution in the workspace
  • patch: File modifications and code changes
  • web_search: Web searches for information
  • MCP tools: Any tools provided by MCP servers

All tools are executed by Codex CLI internally - the AI SDK does not re-execute them.


## Implementation Timeline

1. **Phase 1** (Day 1-2): Implement core tool event detection and streaming
2. **Phase 2** (Day 3): Add enhanced features like output streaming
3. **Phase 3** (Day 4): Comprehensive testing with all tool types
4. **Phase 4** (Day 5): Edge cases and error handling
5. **Phase 5** (Day 6): Documentation and examples

## Backwards Compatibility

This implementation is **fully backwards compatible**:
- Existing code continues to work unchanged
- Tool streaming events are additive only
- No breaking changes to existing APIs
- Users opt-in to tool streaming by using `fullStream`

## Benefits

1. **Enhanced UI/UX**: Applications can show tool execution progress in real-time
2. **Better Debugging**: Developers can see exactly what tools Codex is calling
3. **Feature Parity**: Matches capabilities of other AI SDK providers
4. **Command Output Streaming**: See command output as it happens (unique to Codex)
5. **Transparency**: Users can see file changes, searches, and command execution live

## Potential Risks & Mitigations

| Risk | Mitigation |
|------|------------|
| Large event volume overwhelming stream | Implement event batching/throttling |
| Breaking existing streams | Additive changes only, no modifications to existing events |
| Incomplete tool results | Track call_ids and emit warnings for unclosed tools |
| Unknown event types from Codex updates | Graceful handling with logging, forward compatibility |
| AI SDK attempting to re-execute tools | Always set `providerExecuted: true` on tool-call events |

## Success Criteria

- [ ] All Codex tool types emit streaming events
- [ ] Tool begin/end events are properly correlated via call_id
- [ ] Command output streams in real-time
- [ ] No performance degradation
- [ ] Backwards compatibility maintained
- [ ] Examples run successfully
- [ ] Documentation is clear and complete

## Next Steps

1. Review this plan with maintainers
2. Create feature branch `feature/tool-streaming`
3. Implement changes incrementally with tests
4. Open PR with comprehensive description
5. Address feedback and iterate

## References

- [AI SDK Streaming Documentation](https://sdk.vercel.ai/docs/reference/ai-sdk-core/stream-text)
- [Codex CLI Protocol Definition](https://github.com/openai/codex/blob/main/codex-rs/protocol/src/protocol.rs)
- [Original Issue #2](https://github.com/ben-vargas/ai-sdk-provider-codex-cli/issues/2)
- [Claude Code Tool Streaming Reference](https://github.com/ben-vargas/ai-sdk-provider-claude-code/issues/36)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment