Skip to content

Instantly share code, notes, and snippets.

@cablehead
Created May 20, 2025 14:23
Show Gist options
  • Save cablehead/9243819b8816d0cfa91c01b126a6fd7d to your computer and use it in GitHub Desktop.
Save cablehead/9243819b8816d0cfa91c01b126a6fd7d to your computer and use it in GitHub Desktop.

Spec: Command API Change – Use .response Frame with Collected Pipeline

Objective

  • Change command event output: Instead of streaming each output Value as a .recv event, collect the full pipeline, then append a single frame with topic <name>.response.
  • Apply return_options: The new .response event must respect the suffix and ttl from the command's return_options.
  • Remove .complete event: No .complete frame is emitted for these commands.
  • Payload: The collected result should be a serialized array (JSON list) of all values the pipeline produced (empty if there were none).

Changes Required

1. Command Execution Logic

  • Current: Each Value in the command’s output pipeline is sent as a <name>.recv frame. At the end, a <name>.complete event is written.

  • New:

    • Collect the pipeline (using collect or equivalent logic).

    • Create one frame:

      • topic: <name><suffix> (default suffix is .response, can be overridden via return_options.suffix)

      • hash: The CAS hash of the JSON array of collected outputs (even if empty).

      • meta:

        • Must include "command_id" and "frame_id"
        • May merge in any extra meta required
      • ttl: As given by return_options.ttl, or None (permanent) if not set.

    • No .complete frame.

2. Frame Format

  • topic: "{command_name}{suffix}" (e.g., echo.response, or custom suffix)

  • hash: CAS hash of serde_json::to_string(&collected_values)

  • meta:

    {
      "command_id": "...",
      "frame_id": "..."
    }
  • ttl: as described above.

3. Error Handling

  • On error, emit an <name>.error frame as before (no change).

Pseudocode/Implementation Guidance

// After running pipeline, instead of streaming values:

// 1. Collect the pipeline fully (in blocking or async thread):
let values: Vec<Value> = pipeline_data.into_iter().collect();

// 2. Serialize as JSON (as a List/Array):
let json_list = serde_json::to_string(&values).unwrap();
let hash = store.cas_insert_sync(json_list)?;

// 3. Build the .response frame:
let response_topic = format!(
    "{}{}",
    frame.topic.strip_suffix(".call").unwrap(),
    command.return_options.as_ref()
        .and_then(|opts| opts.suffix.as_deref())
        .unwrap_or(".response")
);

let _ = store.append(
    Frame::builder(response_topic, frame.context_id)
        .maybe_ttl(command.return_options.as_ref().and_then(|opts| opts.ttl.clone()))
        .hash(hash)
        .meta(json!({
            "command_id": command.id.to_string(),
            "frame_id": frame.id.to_string(),
        }))
        .build()
);

// 4. Do not emit .complete.

Test Case Example

Given a command:

{
  run: {|frame|
    [1, 2, 3] | each {|x| $x * 2 }
  },
  return_options: { suffix: ".response", ttl: "time:1000" }
}

When you call it, Then it emits one frame:

  • topic: cmd.response
  • hash: points to "[2,4,6]" (as a JSON array)
  • meta: contains command_id and frame_id
  • ttl: 1000 ms

Clarifications

  • If the command returns nothing, the JSON should be [].
  • If the pipeline throws, error reporting is unchanged.
  • This does not affect direct .append calls within the user’s script; only the main command output changes.

Summary Table

Behavior Before After
Output Event N x .recv, 1 x .complete 1 x .response (default suffix, overridable)
Output Value(s) One per .recv All as array in .response
return_options.suffix Applied to .recv Applied to .response
return_options.ttl Applied to .recv Applied to .response
.complete frame Emitted Not emitted
Error Handling .error as before .error as before

Migration Checklist

  • Remove .recv streaming logic
  • Remove .complete emission
  • After pipeline run, collect all output as Vec<Value>
  • Serialize as JSON array, put in CAS
  • Emit one frame with topic <name>{suffix or ".response"} using meta and ttl from return_options
  • Tests updated to expect single .response frame with array content
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment