Skip to content

Instantly share code, notes, and snippets.

@nerdalert
Created June 26, 2026 21:40
Show Gist options
  • Select an option

  • Save nerdalert/b7ec35b6b5b5fcc7a56699c7fd8ffb01 to your computer and use it in GitHub Desktop.

Select an option

Save nerdalert/b7ec35b6b5b5fcc7a56699c7fd8ffb01 to your computer and use it in GitHub Desktop.
```diff
diff --git a/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs b/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs
index 5e60a3f..8dc4ced 100644
--- a/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs
+++ b/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs
@@ -27,6 +27,7 @@ use crate::{
filter::{HttpFilter, HttpFilterContext},
};
+// -----------------------------------------------------------------------------
// Constants
// -----------------------------------------------------------------------------
@@ -184,10 +185,7 @@ async fn validate_previous_response(
return Ok(action);
}
- extract_mcp_tools(ctx, &record);
- extract_previous_usage(ctx, &record);
-
- ctx.extensions.insert(build_state(parsed_body, record.messages));
+ populate_state_and_metadata(ctx, parsed_body, &record);
debug!(previous_response_id = %prev_id, "previous response validated, state populated");
ctx.set_metadata("responses.previous_response_id", prev_id);
@@ -195,18 +193,89 @@ async fn validate_previous_response(
Ok(FilterAction::Release)
}
+/// Promote previous response metadata and insert request state.
+fn populate_state_and_metadata(ctx: &mut HttpFilterContext<'_>, parsed_body: Value, record: &ResponseRecord) {
+ let previous_tools = collect_mcp_tool_listings(record);
+ write_mcp_tools_metadata(ctx, &previous_tools);
+
+ let previous_usage = record.response_object.get("usage").filter(|usage| !usage.is_null());
+ write_previous_usage_metadata(ctx, previous_usage);
+
+ ctx.extensions.insert(build_state(
+ parsed_body,
+ record,
+ previous_tools,
+ previous_usage.cloned(),
+ ));
+}
+
/// Build [`ResponsesState`] by prepending stored messages before the current input.
// TODO(#697): enforce a max rehydrated history size.
-fn build_state(parsed_body: Value, messages: Value) -> ResponsesState {
+fn build_state(
+ parsed_body: Value,
+ record: &ResponseRecord,
+ previous_tools: Vec<Value>,
+ previous_usage: Option<Value>,
+) -> ResponsesState {
let mut state = ResponsesState::from_request_body(parsed_body);
- let stored = match messages {
- Value::Array(arr) => arr,
- _ => Vec::new(),
- };
+ let stored = stored_messages_for_rehydrate(record);
state.messages.splice(0..0, stored);
+ state.previous_tools = previous_tools;
+ state.previous_usage = previous_usage;
state
}
+/// Return stored history, reconstructing from public fields for
+/// records created before hidden messages were persisted.
+fn stored_messages_for_rehydrate(record: &ResponseRecord) -> Vec<Value> {
+ if let Some(messages) = record.messages.as_array().filter(|messages| !messages.is_empty()) {
+ return messages.clone();
+ }
+
+ reconstruct_messages_from_public_response(record)
+}
+
+/// Reconstruct previous input/output items from public stored fields.
+fn reconstruct_messages_from_public_response(record: &ResponseRecord) -> Vec<Value> {
+ let mut messages = Vec::new();
+
+ append_stored_input_items(&mut messages, record.input.clone());
+
+ if let Some(output) = record.response_object.get("output").filter(|output| !output.is_null()) {
+ append_stored_output_items(&mut messages, output.clone());
+ }
+
+ messages
+}
+
+/// Append stored response input as Responses API item params.
+fn append_stored_input_items(messages: &mut Vec<Value>, input: Value) {
+ match input {
+ Value::Null => {},
+ Value::String(text) => messages.push(user_message_item(&text)),
+ Value::Array(items) => messages.extend(items),
+ other => messages.push(other),
+ }
+}
+
+/// Append stored response output items.
+fn append_stored_output_items(messages: &mut Vec<Value>, output: Value) {
+ if let Value::Array(items) = output {
+ messages.extend(items);
+ } else {
+ messages.push(output);
+ }
+}
+
+/// Build a Responses API user message item from string input.
+fn user_message_item(text: &str) -> Value {
+ serde_json::json!({
+ "type": "message",
+ "role": "user",
+ "content": text,
+ })
+}
+
/// Parse the request body and extract `previous_response_id`.
///
/// Returns the parsed body alongside the optional ID so callers
@@ -278,22 +347,16 @@ fn validate_response_status(record: &ResponseRecord) -> Result<(), FilterAction>
// MCP Tool & Usage Extraction
// -----------------------------------------------------------------------------
-/// Extract MCP tool listings from stored history or previous
-/// response output and set a compact metadata signal for downstream
-/// filters.
-///
-/// Scans stored message history and `record.response_object["output"]`
-/// for items with `"type": "mcp_list_tools"` and builds a compact
-/// JSON array of `{"server_label": "x", "tools": ["name1", "name2"]}`.
+/// Set a compact MCP tool metadata signal for downstream filters.
///
/// If the serialized value exceeds [`MAX_METADATA_VALUE_BYTES`],
/// a boolean `"true"` is set instead.
-fn extract_mcp_tools(ctx: &mut HttpFilterContext<'_>, record: &ResponseRecord) {
- let summaries = collect_mcp_tool_summaries(record);
- if summaries.is_empty() {
+fn write_mcp_tools_metadata(ctx: &mut HttpFilterContext<'_>, listings: &[Value]) {
+ if listings.is_empty() {
return;
}
+ let summaries = compact_mcp_tool_summaries(listings);
let compact = Value::Array(summaries);
match serde_json::to_string(&compact) {
Ok(s) if s.len() <= MAX_METADATA_VALUE_BYTES => {
@@ -310,66 +373,83 @@ fn extract_mcp_tools(ctx: &mut HttpFilterContext<'_>, record: &ResponseRecord) {
}
}
-/// Build compact summaries from `mcp_list_tools` output items.
-///
-/// Each summary contains the `server_label` and an array of
-/// tool names (without schemas or descriptions).
-fn collect_mcp_tool_summaries(record: &ResponseRecord) -> Vec<Value> {
- let mut summaries = Vec::new();
+/// Recover MCP tool listings from stored history and response output.
+fn collect_mcp_tool_listings(record: &ResponseRecord) -> Vec<Value> {
+ let mut listings = Vec::new();
let mut seen = HashSet::new();
if let Some(messages) = record.messages.as_array() {
- collect_mcp_tool_summaries_from_items(messages, &mut seen, &mut summaries);
+ collect_mcp_tool_listings_from_items(messages, &mut seen, &mut listings);
}
if let Some(output) = record.response_object.get("output").and_then(Value::as_array) {
- collect_mcp_tool_summaries_from_items(output, &mut seen, &mut summaries);
+ collect_mcp_tool_listings_from_items(output, &mut seen, &mut listings);
}
- summaries
+ listings
}
-/// Append compact MCP tool summaries from a sequence of response items.
-fn collect_mcp_tool_summaries_from_items(
+/// Append MCP tool listings from a sequence of response items.
+fn collect_mcp_tool_listings_from_items(
items: &[Value],
seen: &mut HashSet<(String, Vec<String>)>,
- summaries: &mut Vec<Value>,
+ listings: &mut Vec<Value>,
) {
- summaries.extend(
- items
- .iter()
- .filter(|item| item.get("type").and_then(Value::as_str) == Some("mcp_list_tools"))
- .filter_map(|item| {
- let label = item.get("server_label").and_then(Value::as_str)?;
- let names: Vec<String> = item
- .get("tools")
- .and_then(Value::as_array)
- .map(|tools| {
- tools
- .iter()
- .filter_map(|t| t.get("name").and_then(Value::as_str).map(ToOwned::to_owned))
- .collect()
- })
- .unwrap_or_default();
- if !seen.insert((label.to_owned(), names.clone())) {
- return None;
- }
- Some(serde_json::json!({
- "server_label": label,
- "tools": names,
- }))
- }),
- );
+ listings.extend(items.iter().filter_map(|item| {
+ if item.get("type").and_then(Value::as_str) != Some("mcp_list_tools") {
+ return None;
+ }
+
+ let label = item.get("server_label").and_then(Value::as_str)?;
+ let tools = item.get("tools").and_then(Value::as_array)?;
+ let names = mcp_tool_names(tools);
+ let mut dedupe_names = names.clone();
+ dedupe_names.sort();
+ dedupe_names.dedup();
+
+ if !seen.insert((label.to_owned(), dedupe_names)) {
+ return None;
+ }
+
+ Some(serde_json::json!({
+ "server_label": label,
+ "tools": tools,
+ }))
+ }));
+}
+
+/// Build compact summaries from recovered MCP listings.
+fn compact_mcp_tool_summaries(listings: &[Value]) -> Vec<Value> {
+ listings
+ .iter()
+ .filter_map(|listing| {
+ let label = listing.get("server_label").and_then(Value::as_str)?;
+ let tools = listing.get("tools").and_then(Value::as_array)?;
+ let names = mcp_tool_names(tools);
+
+ Some(serde_json::json!({
+ "server_label": label,
+ "tools": names,
+ }))
+ })
+ .collect()
+}
+
+/// Extract tool names from MCP tool definitions.
+fn mcp_tool_names(tools: &[Value]) -> Vec<String> {
+ tools
+ .iter()
+ .filter_map(|tool| tool.get("name").and_then(Value::as_str).map(ToOwned::to_owned))
+ .collect()
}
/// Extract token usage from the previous response and set
/// metadata keys for downstream auto-compaction.
///
-/// Reads `record.response_object["usage"]` and writes
-/// `input_tokens`, `output_tokens`, and `total_tokens` as
-/// individual string metadata values.
-fn extract_previous_usage(ctx: &mut HttpFilterContext<'_>, record: &ResponseRecord) {
- let Some(usage) = record.response_object.get("usage") else {
+/// Writes `input_tokens`, `output_tokens`, and `total_tokens` as
+/// individual string metadata values when present.
+fn write_previous_usage_metadata(ctx: &mut HttpFilterContext<'_>, usage: Option<&Value>) {
+ let Some(usage) = usage else {
return;
};
diff --git a/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs b/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs
index 79e43e8..e7c8cef 100644
--- a/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs
+++ b/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs
@@ -516,6 +516,16 @@ async fn extracts_mcp_tools_from_previous_response() {
assert_eq!(tools.len(), 2, "should have two tool names");
assert_eq!(tools[0], "get_weather", "first tool name should match");
assert_eq!(tools[1], "search", "second tool name should match");
+
+ let state = ctx
+ .extensions
+ .get::<ResponsesState>()
+ .expect("ResponsesState should be populated");
+ assert_eq!(state.previous_tools.len(), 1, "should store full previous tool listing");
+ assert_eq!(
+ state.previous_tools[0]["tools"][0]["description"], "Get weather",
+ "ResponsesState should preserve full tool definitions"
+ );
}
#[tokio::test]
@@ -596,6 +606,83 @@ async fn extracts_mcp_tools_from_multiple_servers() {
);
}
+#[tokio::test]
+async fn deduplicates_mcp_tools_independent_of_tool_order() {
+ let mut records = std::collections::HashMap::new();
+ records.insert(
+ "resp_dedupe".to_owned(),
+ ResponseRecord {
+ id: "resp_dedupe".to_owned(),
+ tenant_id: "default".to_owned(),
+ created_at: 1000,
+ model: "gpt-4.1".to_owned(),
+ response_object: json!({
+ "id": "resp_dedupe",
+ "status": "completed",
+ "output": [{
+ "id": "mcpl_output",
+ "type": "mcp_list_tools",
+ "server_label": "shared-server",
+ "tools": [
+ {"name": "beta", "description": "d", "input_schema": {}},
+ {"name": "alpha", "description": "d", "input_schema": {}}
+ ]
+ }]
+ }),
+ input: json!("Hello"),
+ messages: json!([
+ {
+ "id": "mcpl_history",
+ "type": "mcp_list_tools",
+ "server_label": "shared-server",
+ "tools": [
+ {"name": "alpha", "description": "d", "input_schema": {}},
+ {"name": "beta", "description": "d", "input_schema": {}}
+ ]
+ }
+ ]),
+ },
+ );
+ let store = MockStore {
+ records,
+ should_fail: false,
+ };
+ let registry = setup_registry(store);
+
+ let filter = RehydrateFilter;
+ let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses");
+ let mut ctx = crate::test_utils::make_filter_context(&req);
+ ctx.extensions.insert(registry.clone());
+ ctx.set_metadata("openai_responses_format.format", "openai_responses");
+ let mut body = Some(Bytes::from(r#"{"input":"Next","previous_response_id":"resp_dedupe"}"#));
+
+ let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap();
+ assert!(
+ matches!(action, FilterAction::Release),
+ "should release after rehydration"
+ );
+
+ let mcp_meta = ctx
+ .get_metadata("responses.previous_mcp_tools")
+ .expect("should set MCP tools metadata");
+ let parsed: Value = serde_json::from_str(mcp_meta).unwrap();
+ assert_eq!(
+ parsed.as_array().unwrap().len(),
+ 1,
+ "same server/tools should dedupe even when order differs"
+ );
+
+ let state = ctx
+ .extensions
+ .get::<ResponsesState>()
+ .expect("ResponsesState should be populated");
+ assert_eq!(
+ state.previous_tools.len(),
+ 1,
+ "ResponsesState should not retain duplicate MCP listings"
+ );
+}
+
#[tokio::test]
async fn extracts_mcp_tools_from_stored_history_when_latest_output_has_none() {
let mut records = std::collections::HashMap::new();
@@ -690,6 +777,16 @@ async fn mcp_tools_overflow_sets_boolean_flag() {
Some("true"),
"should fall back to boolean flag when compact JSON exceeds 256 bytes"
);
+
+ let state = ctx
+ .extensions
+ .get::<ResponsesState>()
+ .expect("ResponsesState should be populated");
+ assert_eq!(
+ state.previous_tools.len(),
+ 1,
+ "metadata overflow should not drop full previous tools from state"
+ );
}
// -----------------------------------------------------------------------------
@@ -700,7 +797,7 @@ async fn mcp_tools_overflow_sets_boolean_flag() {
async fn extracts_usage_from_previous_response() {
let output = json!([{"type": "message", "content": [{"type": "output_text", "text": "Hi"}]}]);
let usage = json!({"input_tokens": 500, "output_tokens": 200, "total_tokens": 700});
- let store = MockStore::with_output_and_usage("resp_usage", output, usage);
+ let store = MockStore::with_output_and_usage("resp_usage", output, usage.clone());
let registry = setup_registry(store);
let filter = RehydrateFilter;
@@ -730,6 +827,16 @@ async fn extracts_usage_from_previous_response() {
Some("700"),
"total tokens"
);
+
+ let state = ctx
+ .extensions
+ .get::<ResponsesState>()
+ .expect("ResponsesState should be populated");
+ assert_eq!(
+ state.previous_usage.as_ref(),
+ Some(&usage),
+ "previous usage should be stored in ResponsesState"
+ );
}
#[tokio::test]
@@ -762,6 +869,15 @@ async fn no_usage_metadata_when_usage_missing() {
ctx.get_metadata("responses.previous_usage_total_tokens").is_none(),
"should not set total tokens"
);
+
+ let state = ctx
+ .extensions
+ .get::<ResponsesState>()
+ .expect("ResponsesState should be populated");
+ assert!(
+ state.previous_usage.is_none(),
+ "missing usage should not populate previous_usage"
+ );
}
#[tokio::test]
@@ -852,6 +968,27 @@ async fn fallback_reconstruction_includes_mcp_list_tools() {
ctx.get_metadata("responses.previous_mcp_tools").is_some(),
"should set MCP tools metadata even with empty messages"
);
+
+ let state = ctx
+ .extensions
+ .get::<ResponsesState>()
+ .expect("ResponsesState should be populated");
+ assert_eq!(
+ state.messages.len(),
+ 4,
+ "fallback should reconstruct previous input/output before current input"
+ );
+ assert_eq!(state.messages[0]["content"], "Hello", "previous input should be first");
+ assert_eq!(
+ state.messages[1]["type"], "mcp_list_tools",
+ "previous output items should be preserved"
+ );
+ assert_eq!(
+ state.messages[2]["type"], "message",
+ "previous message output should follow"
+ );
+ assert_eq!(state.messages[3]["content"], "Next", "current input should be last");
+ assert_eq!(state.previous_tools.len(), 1, "fallback should populate previous tools");
}
// -----------------------------------------------------------------------------
@@ -890,6 +1027,17 @@ impl MockStore {
fn with_output_and_usage(id: &str, output: Value, usage: Value) -> Self {
let mut records = std::collections::HashMap::new();
+ let mut response_object = json!({
+ "id": id,
+ "status": "completed",
+ "output": output,
+ });
+ if !usage.is_null() {
+ response_object
+ .as_object_mut()
+ .expect("response_object should be an object")
+ .insert("usage".to_owned(), usage);
+ }
records.insert(
id.to_owned(),
ResponseRecord {
@@ -897,12 +1045,7 @@ impl MockStore {
tenant_id: "default".to_owned(),
created_at: 1000,
model: "gpt-4.1".to_owned(),
- response_object: json!({
- "id": id,
- "status": "completed",
- "output": output,
- "usage": usage,
- }),
+ response_object,
input: json!("Hello"),
messages: json!([
{"role": "user", "content": "Hello"},
diff --git a/filter/src/builtins/http/ai/openai/responses/state.rs b/filter/src/builtins/http/ai/openai/responses/state.rs
index b7bdc1c..bfc704c 100644
--- a/filter/src/builtins/http/ai/openai/responses/state.rs
+++ b/filter/src/builtins/http/ai/openai/responses/state.rs
@@ -78,6 +78,12 @@ pub(crate) struct ResponsesState {
/// history for this response and prepends it to `messages`.
pub previous_response_id: Option<String>,
+ /// MCP tool listings recovered from the previous response.
+ pub previous_tools: Vec<serde_json::Value>,
+
+ /// Token usage reported by the previous response.
+ pub previous_usage: Option<serde_json::Value>,
+
/// Parsed request body as received from the client.
pub request_body: serde_json::Value,
@@ -114,6 +120,7 @@ impl ResponsesState {
.get("tool_choice")
.cloned()
.unwrap_or_else(|| serde_json::Value::String("auto".to_owned()));
+ let tools = extract_array_field(&body, "tools");
Self {
context_management: body.get("context_management").cloned(),
@@ -126,12 +133,14 @@ impl ResponsesState {
output_items: Vec::new(),
parallel_tool_calls: extract_bool_or(&body, "parallel_tool_calls", true),
previous_response_id: extract_string(&body, "previous_response_id"),
+ previous_tools: Vec::new(),
+ previous_usage: None,
+ request_body: body,
response_object: serde_json::Value::Null,
tool_calls: Vec::new(),
tool_choice,
- tools: extract_array_field(&body, "tools"),
+ tools,
usage: serde_json::Value::Null,
- request_body: body,
}
}
}
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment