GitHub Issue: #2 - Streaming tool calls? Requested by: @hbmartin Date: January 2025
The user is requesting support for streaming tool calls in streamText and streamObject, similar to the functionality planned for the Claude Code provider. This would enable building UIs that show tool calls and results as they happen.
Codex CLI's --json flag fundamentally changes the output behavior:
- In JSON mode,
AgentMessageDeltaevents are suppressed - no streaming text - The output begins with non-JSON config summary and prompt lines from
EventProcessorWithJsonOutput - Tool events are still emitted as JSON events
- The
doGeneratemethod cannot access streaming events (only finaltask_complete)
The provider successfully implements streaming for:
- Session configuration (
session_configured) - Task completion (
task_complete) - Response metadata and session information
- Final text output (non-streaming, sent all at once)
Tool-related streaming events are not captured or emitted, even though Codex CLI is emitting them. The user only sees:
stream-startresponse-metadata(when session configured)text-delta(final text only)finish
Missing events that should be emitted:
tool-input-starttool-input-deltatool-input-endtool-calltool-result
The Codex CLI provides comprehensive event streaming through its EventMsg enum. Important: Events are serialized with snake_case names due to #[strum(serialize_all = "snake_case")]:
// Event names in JSON will be: mcp_tool_call_begin, exec_command_begin, etc.
pub enum EventMsg {
// Tool-related events
McpToolCallBegin(McpToolCallBeginEvent),
McpToolCallEnd(McpToolCallEndEvent),
ExecCommandBegin(ExecCommandBeginEvent),
ExecCommandOutputDelta(ExecCommandOutputDeltaEvent),
ExecCommandEnd(ExecCommandEndEvent),
WebSearchBegin(WebSearchBeginEvent),
WebSearchEnd(WebSearchEndEvent),
PatchApplyBegin(PatchApplyBeginEvent),
PatchApplyEnd(PatchApplyEndEvent),
ExecApprovalRequest(ExecApprovalRequestEvent),
ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent),
// ... and more
}Each event has specific payload structures:
- All events include
call_id: Required for correlating begin/end events (never generate random IDs!) - McpToolCallBegin: Contains
invocation: {server, tool, arguments} - ExecCommandBegin: Contains
command,cwd,parsed_cmd - WebSearchBegin: Only has
call_id(query comes in WebSearchEnd) - PatchApplyBegin: Contains
auto_approved,changes(notfile_changes) - ExecCommandOutputDelta: Contains Base64-encoded
chunkandstreamenum
Looking at src/codex-cli-language-model.ts:
// Current implementation only captures 2 events:
child.stdout.on('data', (chunk: string) => {
const lines = chunk.split(/\r?\n/).filter(Boolean);
for (const line of lines) {
const evt = this.parseJsonLine(line);
if (!evt) continue;
const msg = evt.msg;
const type = msg?.type;
if (type === 'session_configured' && msg) {
this.sessionId = msg.session_id;
} else if (type === 'task_complete' && msg) {
const last = msg.last_agent_message;
if (typeof last === 'string') text = last;
}
// ALL OTHER EVENTS ARE IGNORED!
}
});The AI SDK expects these LanguageModelV2StreamPart types for tools:
type ToolStreamParts =
| { type: 'tool-input-start'; id: string; toolName: string; }
| { type: 'tool-input-delta'; id: string; delta: string; }
| { type: 'tool-input-end'; id: string; }
| { type: 'tool-call'; toolCallId: string; toolName: string; input: string; providerExecuted?: boolean; }
| { type: 'tool-result'; toolCallId: string; result: unknown; }IMPORTANT: Codex CLI's tools (exec, patch, web search, MCP tools) are executed by Codex CLI itself, NOT by the AI SDK's tool runner. We MUST set providerExecuted: true on all tool-call events to prevent the AI SDK from attempting to re-execute these tools. This is identical to the Claude Code provider's requirement.
Codex CLI has different tool types than Claude Code:
| Codex Event Type (JSON) | Tool Name | Description |
|---|---|---|
| mcp_tool_call_begin/end | invocation.tool | MCP server tools |
| exec_command_begin/end | "exec" | Command execution |
| web_search_begin/end | "web_search" | Web search |
| patch_apply_begin/end | "patch" | Code modifications |
| exec_approval_request | "exec_approval" | Approval requests |
| apply_patch_approval_request | "patch_approval" | Patch approval |
File: src/codex-cli-language-model.ts
// Add near line 29
interface CodexExecCommandBeginEvent {
call_id: string;
command: string[];
cwd: string;
parsed_cmd: any[];
}
interface CodexExecCommandOutputDeltaEvent {
call_id: string;
stream: 'stdout' | 'stderr';
chunk: string; // Base64-encoded bytes
}
interface CodexPatchApplyBeginEvent {
call_id: string;
auto_approved: boolean;
changes: Record<string, unknown>; // Note: 'changes' not 'file_changes'
}
interface CodexMcpToolCallBeginEvent {
call_id: string;
invocation: {
server: string;
tool: string;
arguments?: unknown;
};
}
interface CodexWebSearchEndEvent {
call_id: string;
query: string;
}
// Helper to skip JSON preamble lines
private isPreambleLine(line: string): boolean {
// Config summary and prompt lines from EventProcessorWithJsonOutput
try {
const parsed = JSON.parse(line);
return !parsed.id || !parsed.msg; // Not an event if missing id/msg
} catch {
return true; // Non-JSON lines are preamble
}
}// Add around line 160
private extractToolName(eventType: string, event: any): string {
switch(eventType) {
case 'mcp_tool_call_begin':
return event.invocation?.tool || 'mcp_tool';
case 'exec_command_begin':
return 'exec';
case 'web_search_begin':
case 'web_search_end':
return 'web_search';
case 'patch_apply_begin':
return 'patch';
default:
return eventType.replace(/_begin$|_end$/, '').replace(/_/g, '-');
}
}
private isToolBeginEvent(type: string): boolean {
return ['mcp_tool_call_begin', 'exec_command_begin', 'web_search_begin', 'patch_apply_begin'].includes(type);
}
private isToolEndEvent(type: string): boolean {
return ['mcp_tool_call_end', 'exec_command_end', 'web_search_end', 'patch_apply_end'].includes(type);
}
private decodeBase64Chunk(chunk: string): string {
try {
return Buffer.from(chunk, 'base64').toString('utf-8');
} catch {
return ''; // Invalid base64, return empty string
}
}// Replace the current stdout.on('data') handler with:
const pendingWebSearches = new Map<string, any>(); // Track web searches waiting for query
child.stdout.on('data', (chunk: string) => {
const lines = chunk.split(/\r?\n/).filter(Boolean);
for (const line of lines) {
// Skip preamble lines (config summary, prompt)
if (this.isPreambleLine(line)) continue;
const evt = this.parseJsonLine(line);
if (!evt || !evt.msg) continue;
const msg = evt.msg;
const type = msg?.type;
if (type === 'session_configured' && msg) {
this.sessionId = msg.session_id;
controller.enqueue({
type: 'response-metadata',
id: randomUUID(),
timestamp: new Date(),
modelId: this.modelId,
});
}
else if (this.isToolBeginEvent(type) && msg) {
const toolName = this.extractToolName(type, msg);
const toolId = msg.call_id; // ALWAYS use call_id from Codex
if (!toolId) {
console.warn(`Missing call_id for ${type} event`);
continue;
}
// Special handling for web_search_begin (no query yet)
if (type === 'web_search_begin') {
pendingWebSearches.set(toolId, { toolName });
// Don't emit events yet, wait for web_search_end with query
continue;
}
// Emit tool-input-start
controller.enqueue({
type: 'tool-input-start',
id: toolId,
toolName: toolName,
});
// Prepare input data based on event type
let inputData: any = {};
if (type === 'exec_command_begin') {
inputData = {
command: msg.command,
cwd: msg.cwd,
parsed_cmd: msg.parsed_cmd
};
} else if (type === 'patch_apply_begin') {
inputData = {
changes: msg.changes, // Note: 'changes' not 'file_changes'
auto_approved: msg.auto_approved
};
} else if (type === 'mcp_tool_call_begin') {
inputData = {
server: msg.invocation?.server,
tool: msg.invocation?.tool,
arguments: msg.invocation?.arguments
};
}
const inputString = JSON.stringify(inputData);
// Emit tool-input-delta
controller.enqueue({
type: 'tool-input-delta',
id: toolId,
delta: inputString,
});
// Emit tool-input-end
controller.enqueue({
type: 'tool-input-end',
id: toolId,
});
// Emit tool-call with providerExecuted flag
controller.enqueue({
type: 'tool-call',
toolCallId: toolId,
toolName: toolName,
input: inputString,
providerExecuted: true, // CRITICAL: Codex executes its own tools
});
}
else if (type === 'exec_command_output_delta' && msg) {
const toolId = msg.call_id;
if (!toolId) continue;
// Decode Base64 chunk
const decodedOutput = this.decodeBase64Chunk(msg.chunk);
// Stream command output as partial results
controller.enqueue({
type: 'tool-result',
toolCallId: toolId,
result: {
type: 'output-delta',
stream: msg.stream, // 'stdout' or 'stderr'
output: decodedOutput
},
providerMetadata: {
'codex-cli': {
isPartial: true,
stream: msg.stream
}
}
});
}
else if (type === 'web_search_end' && msg) {
const toolId = msg.call_id;
const pending = pendingWebSearches.get(toolId);
if (pending) {
// Now we have the query, emit all events
controller.enqueue({
type: 'tool-input-start',
id: toolId,
toolName: 'web_search',
});
const inputString = JSON.stringify({ query: msg.query });
controller.enqueue({
type: 'tool-input-delta',
id: toolId,
delta: inputString,
});
controller.enqueue({
type: 'tool-input-end',
id: toolId,
});
controller.enqueue({
type: 'tool-call',
toolCallId: toolId,
toolName: 'web_search',
input: inputString,
providerExecuted: true,
});
pendingWebSearches.delete(toolId);
}
// Also emit the result
controller.enqueue({
type: 'tool-result',
toolCallId: toolId,
result: { query: msg.query, success: true },
});
}
else if (this.isToolEndEvent(type) && msg && type !== 'web_search_end') {
const toolId = msg.call_id;
if (!toolId) continue;
// Handle different result formats
let result: any = { success: true };
if (type === 'exec_command_end') {
result = {
stdout: msg.stdout,
stderr: msg.stderr,
aggregated_output: msg.aggregated_output,
exit_code: msg.exit_code
};
} else if (type === 'patch_apply_end') {
result = {
stdout: msg.stdout,
stderr: msg.stderr,
success: msg.success
};
} else if (type === 'mcp_tool_call_end' && msg.result) {
// Handle Result<CallToolResult, String> from Rust
if (msg.result.Ok) {
result = msg.result.Ok;
} else if (msg.result.Err) {
result = { error: msg.result.Err, success: false };
}
}
controller.enqueue({
type: 'tool-result',
toolCallId: toolId,
result: result,
providerMetadata: {
'codex-cli': {
eventType: type
}
}
});
}
else if (type === 'exec_approval_request' || type === 'apply_patch_approval_request') {
// Handle approval requests as metadata, not tool calls
// These share call_id with the subsequent exec_command_begin/patch_apply_begin
const toolId = msg.call_id;
if (!toolId) continue;
// Emit as response-metadata to avoid call_id collision
controller.enqueue({
type: 'response-metadata',
id: randomUUID(),
timestamp: new Date(),
modelId: this.modelId,
providerMetadata: {
'codex-cli': {
approvalRequest: {
type: type,
call_id: toolId,
details: msg
}
}
}
});
}
else if (type === 'agent_message_delta' && msg) {
// Note: Suppressed in JSON mode
const text = msg.content || msg.text || '';
if (text) {
if (!textPartId) {
textPartId = randomUUID();
controller.enqueue({ type: 'text-start', id: textPartId });
}
controller.enqueue({
type: 'text-delta',
id: textPartId,
delta: text
});
}
}
else if (type === 'task_complete' && msg) {
const last = msg.last_agent_message;
if (typeof last === 'string') {
accumulatedText = last;
}
}
}
});Important limitation: In JSON mode (exec --json), Codex only provides the final task_complete event - there's no stream to capture intermediate tool events. Tool information would need to be extracted from the final output or response metadata if available. Consider:
// In doGenerate method
// Tool events are NOT available in non-streaming mode
// Only the final task_complete message is available
// Consider documenting this limitation or always using streaming internallyCommand output streaming is already implemented above with proper Base64 decoding:
exec_command_output_deltaevents contain Base64-encoded chunks- The
streamfield indicates whether it's stdout or stderr - Decoded output is sent as partial tool-result events
else if (this.isToolEndEvent(type) && msg && msg.error) {
controller.enqueue({
type: 'tool-result',
toolCallId: msg.call_id,
result: {
type: 'error',
error: msg.error,
message: msg.error_message || 'Tool execution failed'
},
providerMetadata: {
'codex-cli': {
isError: true,
eventType: type
}
}
});
}Retry loops may emit multiple begin/end pairs for the same logical action:
// Track retry attempts
const retryCounters = new Map<string, number>();
if (this.isToolBeginEvent(type)) {
const count = retryCounters.get(toolId) || 0;
retryCounters.set(toolId, count + 1);
// Include retry count in metadata
}The first lines of output are non-JSON config summary and prompt from EventProcessorWithJsonOutput:
// Example preamble lines (not valid events):
// {"model":"gpt-5-codex","temperature":"0.5"}
// {"prompt":"List files in current directory"}
// Followed by actual events with id and msg fields// examples/streaming-with-tools.ts
import { streamText } from 'ai';
import { codexCli } from '../dist/index.js';
async function main() {
const result = streamText({
model: codexCli('gpt-5-codex'),
prompt: 'List the files in the current directory and show their sizes',
});
const toolCalls: any[] = [];
for await (const part of result.fullStream) {
switch(part.type) {
case 'tool-input-start':
console.log(`π§ Starting tool: ${part.toolName}`);
break;
case 'tool-input-delta':
console.log(` Input: ${part.delta}`);
break;
case 'tool-call':
console.log(`β
Tool called: ${part.toolName} (${part.toolCallId})`);
toolCalls.push({ name: part.toolName, id: part.toolCallId });
break;
case 'tool-result':
console.log(`π Tool result for ${part.toolCallId}:`,
JSON.stringify(part.result).substring(0, 100));
break;
case 'text-delta':
process.stdout.write(part.delta);
break;
}
}
console.log('\n\nTool calls made:', toolCalls);
}
main().catch(console.error);// examples/streaming-multiple-tool-types.ts
async function testMultipleTools() {
const result = streamText({
model: codexCli('gpt-5-codex'),
prompt: 'Search for information about TypeScript, then create a hello.ts file with a simple example',
});
const toolsByType: Record<string, number> = {};
for await (const part of result.fullStream) {
if (part.type === 'tool-call') {
toolsByType[part.toolName] = (toolsByType[part.toolName] || 0) + 1;
}
}
console.log('\nTool usage summary:', toolsByType);
// Expected: { web_search: 1, patch: 1 }
}// examples/streaming-exec-output.ts
async function testCommandOutput() {
const result = streamText({
model: codexCli('gpt-5-codex'),
prompt: 'Run npm test and show me the results',
});
let currentToolCall: string | undefined;
for await (const part of result.fullStream) {
if (part.type === 'tool-call' && part.toolName === 'exec') {
currentToolCall = part.toolCallId;
console.log('π Executing command...');
}
if (part.type === 'tool-result' && part.toolCallId === currentToolCall) {
// Stream command output - note the correct structure
if (part.result?.type === 'output-delta') {
// Output is in part.result.output, not stdout/stderr
const output = part.result.output || '';
const stream = part.result.stream; // 'stdout' or 'stderr'
if (stream === 'stderr') {
process.stderr.write(output);
} else {
process.stdout.write(output);
}
}
}
}
}- Handle missing call_id: Log warning and skip the malformed event (never generate fake IDs)
- Handle tool execution errors: Properly emit error results
- Handle interrupted streams: Clean up partial tool calls
- Handle unknown tool types: Gracefully ignore or log
- Prevent double execution: MUST set
providerExecuted: trueon all tool calls
Based on ChatGPT's review, the following corrections were critical:
- Event names are snake_case, not PascalCase (due to
#[strum(serialize_all = "snake_case")]) - MCP tool structure:
invocation.server,invocation.tool,invocation.arguments(not flat structure) - WebSearch query timing: Query only available in
web_search_end, notweb_search_begin - call_id is required: Never generate random IDs - Codex always provides call_id
- Base64 decoding required:
exec_command_output_deltachunks are Base64-encoded - JSON mode limitations: No
agent_message_deltaevents, preamble lines need handling - Correct field names:
changesnotfile_changes,parsed_cmdincluded in exec events
Update README.md with:
## Tool Streaming Support
The provider now supports streaming of tool calls, enabling real-time UI updates as Codex executes commands, searches, and modifies files:
```typescript
const result = streamText({
model: codexCli('gpt-5-codex'),
prompt: 'Analyze and fix any TypeScript errors in the project',
});
for await (const part of result.fullStream) {
switch(part.type) {
case 'tool-input-start':
// Tool execution starting
console.log(`Starting: ${part.toolName}`);
break;
case 'tool-call':
// Tool has been invoked
console.log(`Executing: ${part.toolName}`);
break;
case 'tool-result':
// Tool execution completed
console.log(`Result:`, part.result);
break;
// ... handle other stream parts
}
}- exec: Command execution in the workspace
- patch: File modifications and code changes
- web_search: Web searches for information
- MCP tools: Any tools provided by MCP servers
All tools are executed by Codex CLI internally - the AI SDK does not re-execute them.
## Implementation Timeline
1. **Phase 1** (Day 1-2): Implement core tool event detection and streaming
2. **Phase 2** (Day 3): Add enhanced features like output streaming
3. **Phase 3** (Day 4): Comprehensive testing with all tool types
4. **Phase 4** (Day 5): Edge cases and error handling
5. **Phase 5** (Day 6): Documentation and examples
## Backwards Compatibility
This implementation is **fully backwards compatible**:
- Existing code continues to work unchanged
- Tool streaming events are additive only
- No breaking changes to existing APIs
- Users opt-in to tool streaming by using `fullStream`
## Benefits
1. **Enhanced UI/UX**: Applications can show tool execution progress in real-time
2. **Better Debugging**: Developers can see exactly what tools Codex is calling
3. **Feature Parity**: Matches capabilities of other AI SDK providers
4. **Command Output Streaming**: See command output as it happens (unique to Codex)
5. **Transparency**: Users can see file changes, searches, and command execution live
## Potential Risks & Mitigations
| Risk | Mitigation |
|------|------------|
| Large event volume overwhelming stream | Implement event batching/throttling |
| Breaking existing streams | Additive changes only, no modifications to existing events |
| Incomplete tool results | Track call_ids and emit warnings for unclosed tools |
| Unknown event types from Codex updates | Graceful handling with logging, forward compatibility |
| AI SDK attempting to re-execute tools | Always set `providerExecuted: true` on tool-call events |
## Success Criteria
- [ ] All Codex tool types emit streaming events
- [ ] Tool begin/end events are properly correlated via call_id
- [ ] Command output streams in real-time
- [ ] No performance degradation
- [ ] Backwards compatibility maintained
- [ ] Examples run successfully
- [ ] Documentation is clear and complete
## Next Steps
1. Review this plan with maintainers
2. Create feature branch `feature/tool-streaming`
3. Implement changes incrementally with tests
4. Open PR with comprehensive description
5. Address feedback and iterate
## References
- [AI SDK Streaming Documentation](https://sdk.vercel.ai/docs/reference/ai-sdk-core/stream-text)
- [Codex CLI Protocol Definition](https://github.com/openai/codex/blob/main/codex-rs/protocol/src/protocol.rs)
- [Original Issue #2](https://github.com/ben-vargas/ai-sdk-provider-codex-cli/issues/2)
- [Claude Code Tool Streaming Reference](https://github.com/ben-vargas/ai-sdk-provider-claude-code/issues/36)