Created
June 26, 2026 21:40
-
-
Save nerdalert/b7ec35b6b5b5fcc7a56699c7fd8ffb01 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ```diff | |
| diff --git a/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs b/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs | |
| index 5e60a3f..8dc4ced 100644 | |
| --- a/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs | |
| +++ b/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs | |
| @@ -27,6 +27,7 @@ use crate::{ | |
| filter::{HttpFilter, HttpFilterContext}, | |
| }; | |
| +// ----------------------------------------------------------------------------- | |
| // Constants | |
| // ----------------------------------------------------------------------------- | |
| @@ -184,10 +185,7 @@ async fn validate_previous_response( | |
| return Ok(action); | |
| } | |
| - extract_mcp_tools(ctx, &record); | |
| - extract_previous_usage(ctx, &record); | |
| - | |
| - ctx.extensions.insert(build_state(parsed_body, record.messages)); | |
| + populate_state_and_metadata(ctx, parsed_body, &record); | |
| debug!(previous_response_id = %prev_id, "previous response validated, state populated"); | |
| ctx.set_metadata("responses.previous_response_id", prev_id); | |
| @@ -195,18 +193,89 @@ async fn validate_previous_response( | |
| Ok(FilterAction::Release) | |
| } | |
| +/// Promote previous response metadata and insert request state. | |
| +fn populate_state_and_metadata(ctx: &mut HttpFilterContext<'_>, parsed_body: Value, record: &ResponseRecord) { | |
| + let previous_tools = collect_mcp_tool_listings(record); | |
| + write_mcp_tools_metadata(ctx, &previous_tools); | |
| + | |
| + let previous_usage = record.response_object.get("usage").filter(|usage| !usage.is_null()); | |
| + write_previous_usage_metadata(ctx, previous_usage); | |
| + | |
| + ctx.extensions.insert(build_state( | |
| + parsed_body, | |
| + record, | |
| + previous_tools, | |
| + previous_usage.cloned(), | |
| + )); | |
| +} | |
| + | |
| /// Build [`ResponsesState`] by prepending stored messages before the current input. | |
| // TODO(#697): enforce a max rehydrated history size. | |
| -fn build_state(parsed_body: Value, messages: Value) -> ResponsesState { | |
| +fn build_state( | |
| + parsed_body: Value, | |
| + record: &ResponseRecord, | |
| + previous_tools: Vec<Value>, | |
| + previous_usage: Option<Value>, | |
| +) -> ResponsesState { | |
| let mut state = ResponsesState::from_request_body(parsed_body); | |
| - let stored = match messages { | |
| - Value::Array(arr) => arr, | |
| - _ => Vec::new(), | |
| - }; | |
| + let stored = stored_messages_for_rehydrate(record); | |
| state.messages.splice(0..0, stored); | |
| + state.previous_tools = previous_tools; | |
| + state.previous_usage = previous_usage; | |
| state | |
| } | |
| +/// Return stored history, reconstructing from public fields for | |
| +/// records created before hidden messages were persisted. | |
| +fn stored_messages_for_rehydrate(record: &ResponseRecord) -> Vec<Value> { | |
| + if let Some(messages) = record.messages.as_array().filter(|messages| !messages.is_empty()) { | |
| + return messages.clone(); | |
| + } | |
| + | |
| + reconstruct_messages_from_public_response(record) | |
| +} | |
| + | |
| +/// Reconstruct previous input/output items from public stored fields. | |
| +fn reconstruct_messages_from_public_response(record: &ResponseRecord) -> Vec<Value> { | |
| + let mut messages = Vec::new(); | |
| + | |
| + append_stored_input_items(&mut messages, record.input.clone()); | |
| + | |
| + if let Some(output) = record.response_object.get("output").filter(|output| !output.is_null()) { | |
| + append_stored_output_items(&mut messages, output.clone()); | |
| + } | |
| + | |
| + messages | |
| +} | |
| + | |
| +/// Append stored response input as Responses API item params. | |
| +fn append_stored_input_items(messages: &mut Vec<Value>, input: Value) { | |
| + match input { | |
| + Value::Null => {}, | |
| + Value::String(text) => messages.push(user_message_item(&text)), | |
| + Value::Array(items) => messages.extend(items), | |
| + other => messages.push(other), | |
| + } | |
| +} | |
| + | |
| +/// Append stored response output items. | |
| +fn append_stored_output_items(messages: &mut Vec<Value>, output: Value) { | |
| + if let Value::Array(items) = output { | |
| + messages.extend(items); | |
| + } else { | |
| + messages.push(output); | |
| + } | |
| +} | |
| + | |
| +/// Build a Responses API user message item from string input. | |
| +fn user_message_item(text: &str) -> Value { | |
| + serde_json::json!({ | |
| + "type": "message", | |
| + "role": "user", | |
| + "content": text, | |
| + }) | |
| +} | |
| + | |
| /// Parse the request body and extract `previous_response_id`. | |
| /// | |
| /// Returns the parsed body alongside the optional ID so callers | |
| @@ -278,22 +347,16 @@ fn validate_response_status(record: &ResponseRecord) -> Result<(), FilterAction> | |
| // MCP Tool & Usage Extraction | |
| // ----------------------------------------------------------------------------- | |
| -/// Extract MCP tool listings from stored history or previous | |
| -/// response output and set a compact metadata signal for downstream | |
| -/// filters. | |
| -/// | |
| -/// Scans stored message history and `record.response_object["output"]` | |
| -/// for items with `"type": "mcp_list_tools"` and builds a compact | |
| -/// JSON array of `{"server_label": "x", "tools": ["name1", "name2"]}`. | |
| +/// Set a compact MCP tool metadata signal for downstream filters. | |
| /// | |
| /// If the serialized value exceeds [`MAX_METADATA_VALUE_BYTES`], | |
| /// a boolean `"true"` is set instead. | |
| -fn extract_mcp_tools(ctx: &mut HttpFilterContext<'_>, record: &ResponseRecord) { | |
| - let summaries = collect_mcp_tool_summaries(record); | |
| - if summaries.is_empty() { | |
| +fn write_mcp_tools_metadata(ctx: &mut HttpFilterContext<'_>, listings: &[Value]) { | |
| + if listings.is_empty() { | |
| return; | |
| } | |
| + let summaries = compact_mcp_tool_summaries(listings); | |
| let compact = Value::Array(summaries); | |
| match serde_json::to_string(&compact) { | |
| Ok(s) if s.len() <= MAX_METADATA_VALUE_BYTES => { | |
| @@ -310,66 +373,83 @@ fn extract_mcp_tools(ctx: &mut HttpFilterContext<'_>, record: &ResponseRecord) { | |
| } | |
| } | |
| -/// Build compact summaries from `mcp_list_tools` output items. | |
| -/// | |
| -/// Each summary contains the `server_label` and an array of | |
| -/// tool names (without schemas or descriptions). | |
| -fn collect_mcp_tool_summaries(record: &ResponseRecord) -> Vec<Value> { | |
| - let mut summaries = Vec::new(); | |
| +/// Recover MCP tool listings from stored history and response output. | |
| +fn collect_mcp_tool_listings(record: &ResponseRecord) -> Vec<Value> { | |
| + let mut listings = Vec::new(); | |
| let mut seen = HashSet::new(); | |
| if let Some(messages) = record.messages.as_array() { | |
| - collect_mcp_tool_summaries_from_items(messages, &mut seen, &mut summaries); | |
| + collect_mcp_tool_listings_from_items(messages, &mut seen, &mut listings); | |
| } | |
| if let Some(output) = record.response_object.get("output").and_then(Value::as_array) { | |
| - collect_mcp_tool_summaries_from_items(output, &mut seen, &mut summaries); | |
| + collect_mcp_tool_listings_from_items(output, &mut seen, &mut listings); | |
| } | |
| - summaries | |
| + listings | |
| } | |
| -/// Append compact MCP tool summaries from a sequence of response items. | |
| -fn collect_mcp_tool_summaries_from_items( | |
| +/// Append MCP tool listings from a sequence of response items. | |
| +fn collect_mcp_tool_listings_from_items( | |
| items: &[Value], | |
| seen: &mut HashSet<(String, Vec<String>)>, | |
| - summaries: &mut Vec<Value>, | |
| + listings: &mut Vec<Value>, | |
| ) { | |
| - summaries.extend( | |
| - items | |
| - .iter() | |
| - .filter(|item| item.get("type").and_then(Value::as_str) == Some("mcp_list_tools")) | |
| - .filter_map(|item| { | |
| - let label = item.get("server_label").and_then(Value::as_str)?; | |
| - let names: Vec<String> = item | |
| - .get("tools") | |
| - .and_then(Value::as_array) | |
| - .map(|tools| { | |
| - tools | |
| - .iter() | |
| - .filter_map(|t| t.get("name").and_then(Value::as_str).map(ToOwned::to_owned)) | |
| - .collect() | |
| - }) | |
| - .unwrap_or_default(); | |
| - if !seen.insert((label.to_owned(), names.clone())) { | |
| - return None; | |
| - } | |
| - Some(serde_json::json!({ | |
| - "server_label": label, | |
| - "tools": names, | |
| - })) | |
| - }), | |
| - ); | |
| + listings.extend(items.iter().filter_map(|item| { | |
| + if item.get("type").and_then(Value::as_str) != Some("mcp_list_tools") { | |
| + return None; | |
| + } | |
| + | |
| + let label = item.get("server_label").and_then(Value::as_str)?; | |
| + let tools = item.get("tools").and_then(Value::as_array)?; | |
| + let names = mcp_tool_names(tools); | |
| + let mut dedupe_names = names.clone(); | |
| + dedupe_names.sort(); | |
| + dedupe_names.dedup(); | |
| + | |
| + if !seen.insert((label.to_owned(), dedupe_names)) { | |
| + return None; | |
| + } | |
| + | |
| + Some(serde_json::json!({ | |
| + "server_label": label, | |
| + "tools": tools, | |
| + })) | |
| + })); | |
| +} | |
| + | |
| +/// Build compact summaries from recovered MCP listings. | |
| +fn compact_mcp_tool_summaries(listings: &[Value]) -> Vec<Value> { | |
| + listings | |
| + .iter() | |
| + .filter_map(|listing| { | |
| + let label = listing.get("server_label").and_then(Value::as_str)?; | |
| + let tools = listing.get("tools").and_then(Value::as_array)?; | |
| + let names = mcp_tool_names(tools); | |
| + | |
| + Some(serde_json::json!({ | |
| + "server_label": label, | |
| + "tools": names, | |
| + })) | |
| + }) | |
| + .collect() | |
| +} | |
| + | |
| +/// Extract tool names from MCP tool definitions. | |
| +fn mcp_tool_names(tools: &[Value]) -> Vec<String> { | |
| + tools | |
| + .iter() | |
| + .filter_map(|tool| tool.get("name").and_then(Value::as_str).map(ToOwned::to_owned)) | |
| + .collect() | |
| } | |
| /// Extract token usage from the previous response and set | |
| /// metadata keys for downstream auto-compaction. | |
| /// | |
| -/// Reads `record.response_object["usage"]` and writes | |
| -/// `input_tokens`, `output_tokens`, and `total_tokens` as | |
| -/// individual string metadata values. | |
| -fn extract_previous_usage(ctx: &mut HttpFilterContext<'_>, record: &ResponseRecord) { | |
| - let Some(usage) = record.response_object.get("usage") else { | |
| +/// Writes `input_tokens`, `output_tokens`, and `total_tokens` as | |
| +/// individual string metadata values when present. | |
| +fn write_previous_usage_metadata(ctx: &mut HttpFilterContext<'_>, usage: Option<&Value>) { | |
| + let Some(usage) = usage else { | |
| return; | |
| }; | |
| diff --git a/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs b/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs | |
| index 79e43e8..e7c8cef 100644 | |
| --- a/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs | |
| +++ b/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs | |
| @@ -516,6 +516,16 @@ async fn extracts_mcp_tools_from_previous_response() { | |
| assert_eq!(tools.len(), 2, "should have two tool names"); | |
| assert_eq!(tools[0], "get_weather", "first tool name should match"); | |
| assert_eq!(tools[1], "search", "second tool name should match"); | |
| + | |
| + let state = ctx | |
| + .extensions | |
| + .get::<ResponsesState>() | |
| + .expect("ResponsesState should be populated"); | |
| + assert_eq!(state.previous_tools.len(), 1, "should store full previous tool listing"); | |
| + assert_eq!( | |
| + state.previous_tools[0]["tools"][0]["description"], "Get weather", | |
| + "ResponsesState should preserve full tool definitions" | |
| + ); | |
| } | |
| #[tokio::test] | |
| @@ -596,6 +606,83 @@ async fn extracts_mcp_tools_from_multiple_servers() { | |
| ); | |
| } | |
| +#[tokio::test] | |
| +async fn deduplicates_mcp_tools_independent_of_tool_order() { | |
| + let mut records = std::collections::HashMap::new(); | |
| + records.insert( | |
| + "resp_dedupe".to_owned(), | |
| + ResponseRecord { | |
| + id: "resp_dedupe".to_owned(), | |
| + tenant_id: "default".to_owned(), | |
| + created_at: 1000, | |
| + model: "gpt-4.1".to_owned(), | |
| + response_object: json!({ | |
| + "id": "resp_dedupe", | |
| + "status": "completed", | |
| + "output": [{ | |
| + "id": "mcpl_output", | |
| + "type": "mcp_list_tools", | |
| + "server_label": "shared-server", | |
| + "tools": [ | |
| + {"name": "beta", "description": "d", "input_schema": {}}, | |
| + {"name": "alpha", "description": "d", "input_schema": {}} | |
| + ] | |
| + }] | |
| + }), | |
| + input: json!("Hello"), | |
| + messages: json!([ | |
| + { | |
| + "id": "mcpl_history", | |
| + "type": "mcp_list_tools", | |
| + "server_label": "shared-server", | |
| + "tools": [ | |
| + {"name": "alpha", "description": "d", "input_schema": {}}, | |
| + {"name": "beta", "description": "d", "input_schema": {}} | |
| + ] | |
| + } | |
| + ]), | |
| + }, | |
| + ); | |
| + let store = MockStore { | |
| + records, | |
| + should_fail: false, | |
| + }; | |
| + let registry = setup_registry(store); | |
| + | |
| + let filter = RehydrateFilter; | |
| + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); | |
| + let mut ctx = crate::test_utils::make_filter_context(&req); | |
| + ctx.extensions.insert(registry.clone()); | |
| + ctx.set_metadata("openai_responses_format.format", "openai_responses"); | |
| + let mut body = Some(Bytes::from(r#"{"input":"Next","previous_response_id":"resp_dedupe"}"#)); | |
| + | |
| + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); | |
| + assert!( | |
| + matches!(action, FilterAction::Release), | |
| + "should release after rehydration" | |
| + ); | |
| + | |
| + let mcp_meta = ctx | |
| + .get_metadata("responses.previous_mcp_tools") | |
| + .expect("should set MCP tools metadata"); | |
| + let parsed: Value = serde_json::from_str(mcp_meta).unwrap(); | |
| + assert_eq!( | |
| + parsed.as_array().unwrap().len(), | |
| + 1, | |
| + "same server/tools should dedupe even when order differs" | |
| + ); | |
| + | |
| + let state = ctx | |
| + .extensions | |
| + .get::<ResponsesState>() | |
| + .expect("ResponsesState should be populated"); | |
| + assert_eq!( | |
| + state.previous_tools.len(), | |
| + 1, | |
| + "ResponsesState should not retain duplicate MCP listings" | |
| + ); | |
| +} | |
| + | |
| #[tokio::test] | |
| async fn extracts_mcp_tools_from_stored_history_when_latest_output_has_none() { | |
| let mut records = std::collections::HashMap::new(); | |
| @@ -690,6 +777,16 @@ async fn mcp_tools_overflow_sets_boolean_flag() { | |
| Some("true"), | |
| "should fall back to boolean flag when compact JSON exceeds 256 bytes" | |
| ); | |
| + | |
| + let state = ctx | |
| + .extensions | |
| + .get::<ResponsesState>() | |
| + .expect("ResponsesState should be populated"); | |
| + assert_eq!( | |
| + state.previous_tools.len(), | |
| + 1, | |
| + "metadata overflow should not drop full previous tools from state" | |
| + ); | |
| } | |
| // ----------------------------------------------------------------------------- | |
| @@ -700,7 +797,7 @@ async fn mcp_tools_overflow_sets_boolean_flag() { | |
| async fn extracts_usage_from_previous_response() { | |
| let output = json!([{"type": "message", "content": [{"type": "output_text", "text": "Hi"}]}]); | |
| let usage = json!({"input_tokens": 500, "output_tokens": 200, "total_tokens": 700}); | |
| - let store = MockStore::with_output_and_usage("resp_usage", output, usage); | |
| + let store = MockStore::with_output_and_usage("resp_usage", output, usage.clone()); | |
| let registry = setup_registry(store); | |
| let filter = RehydrateFilter; | |
| @@ -730,6 +827,16 @@ async fn extracts_usage_from_previous_response() { | |
| Some("700"), | |
| "total tokens" | |
| ); | |
| + | |
| + let state = ctx | |
| + .extensions | |
| + .get::<ResponsesState>() | |
| + .expect("ResponsesState should be populated"); | |
| + assert_eq!( | |
| + state.previous_usage.as_ref(), | |
| + Some(&usage), | |
| + "previous usage should be stored in ResponsesState" | |
| + ); | |
| } | |
| #[tokio::test] | |
| @@ -762,6 +869,15 @@ async fn no_usage_metadata_when_usage_missing() { | |
| ctx.get_metadata("responses.previous_usage_total_tokens").is_none(), | |
| "should not set total tokens" | |
| ); | |
| + | |
| + let state = ctx | |
| + .extensions | |
| + .get::<ResponsesState>() | |
| + .expect("ResponsesState should be populated"); | |
| + assert!( | |
| + state.previous_usage.is_none(), | |
| + "missing usage should not populate previous_usage" | |
| + ); | |
| } | |
| #[tokio::test] | |
| @@ -852,6 +968,27 @@ async fn fallback_reconstruction_includes_mcp_list_tools() { | |
| ctx.get_metadata("responses.previous_mcp_tools").is_some(), | |
| "should set MCP tools metadata even with empty messages" | |
| ); | |
| + | |
| + let state = ctx | |
| + .extensions | |
| + .get::<ResponsesState>() | |
| + .expect("ResponsesState should be populated"); | |
| + assert_eq!( | |
| + state.messages.len(), | |
| + 4, | |
| + "fallback should reconstruct previous input/output before current input" | |
| + ); | |
| + assert_eq!(state.messages[0]["content"], "Hello", "previous input should be first"); | |
| + assert_eq!( | |
| + state.messages[1]["type"], "mcp_list_tools", | |
| + "previous output items should be preserved" | |
| + ); | |
| + assert_eq!( | |
| + state.messages[2]["type"], "message", | |
| + "previous message output should follow" | |
| + ); | |
| + assert_eq!(state.messages[3]["content"], "Next", "current input should be last"); | |
| + assert_eq!(state.previous_tools.len(), 1, "fallback should populate previous tools"); | |
| } | |
| // ----------------------------------------------------------------------------- | |
| @@ -890,6 +1027,17 @@ impl MockStore { | |
| fn with_output_and_usage(id: &str, output: Value, usage: Value) -> Self { | |
| let mut records = std::collections::HashMap::new(); | |
| + let mut response_object = json!({ | |
| + "id": id, | |
| + "status": "completed", | |
| + "output": output, | |
| + }); | |
| + if !usage.is_null() { | |
| + response_object | |
| + .as_object_mut() | |
| + .expect("response_object should be an object") | |
| + .insert("usage".to_owned(), usage); | |
| + } | |
| records.insert( | |
| id.to_owned(), | |
| ResponseRecord { | |
| @@ -897,12 +1045,7 @@ impl MockStore { | |
| tenant_id: "default".to_owned(), | |
| created_at: 1000, | |
| model: "gpt-4.1".to_owned(), | |
| - response_object: json!({ | |
| - "id": id, | |
| - "status": "completed", | |
| - "output": output, | |
| - "usage": usage, | |
| - }), | |
| + response_object, | |
| input: json!("Hello"), | |
| messages: json!([ | |
| {"role": "user", "content": "Hello"}, | |
| diff --git a/filter/src/builtins/http/ai/openai/responses/state.rs b/filter/src/builtins/http/ai/openai/responses/state.rs | |
| index b7bdc1c..bfc704c 100644 | |
| --- a/filter/src/builtins/http/ai/openai/responses/state.rs | |
| +++ b/filter/src/builtins/http/ai/openai/responses/state.rs | |
| @@ -78,6 +78,12 @@ pub(crate) struct ResponsesState { | |
| /// history for this response and prepends it to `messages`. | |
| pub previous_response_id: Option<String>, | |
| + /// MCP tool listings recovered from the previous response. | |
| + pub previous_tools: Vec<serde_json::Value>, | |
| + | |
| + /// Token usage reported by the previous response. | |
| + pub previous_usage: Option<serde_json::Value>, | |
| + | |
| /// Parsed request body as received from the client. | |
| pub request_body: serde_json::Value, | |
| @@ -114,6 +120,7 @@ impl ResponsesState { | |
| .get("tool_choice") | |
| .cloned() | |
| .unwrap_or_else(|| serde_json::Value::String("auto".to_owned())); | |
| + let tools = extract_array_field(&body, "tools"); | |
| Self { | |
| context_management: body.get("context_management").cloned(), | |
| @@ -126,12 +133,14 @@ impl ResponsesState { | |
| output_items: Vec::new(), | |
| parallel_tool_calls: extract_bool_or(&body, "parallel_tool_calls", true), | |
| previous_response_id: extract_string(&body, "previous_response_id"), | |
| + previous_tools: Vec::new(), | |
| + previous_usage: None, | |
| + request_body: body, | |
| response_object: serde_json::Value::Null, | |
| tool_calls: Vec::new(), | |
| tool_choice, | |
| - tools: extract_array_field(&body, "tools"), | |
| + tools, | |
| usage: serde_json::Value::Null, | |
| - request_body: body, | |
| } | |
| } | |
| } | |
| ``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment