- Change command event output:
Instead of streaming each output
Value
as a.recv
event, collect the full pipeline, then append a single frame with topic<name>.response
. - Apply
return_options
: The new.response
event must respect thesuffix
andttl
from the command'sreturn_options
. - Remove
.complete
event: No.complete
frame is emitted for these commands. - Payload: The collected result should be a serialized array (JSON list) of all values the pipeline produced (empty if there were none).
-
Current: Each
Value
in the command’s output pipeline is sent as a<name>.recv
frame. At the end, a<name>.complete
event is written. -
New:
-
Collect the pipeline (using
collect
or equivalent logic). -
Create one frame:
-
topic:
<name><suffix>
(default suffix is.response
, can be overridden viareturn_options.suffix
) -
hash: The CAS hash of the JSON array of collected outputs (even if empty).
-
meta:
- Must include
"command_id"
and"frame_id"
- May merge in any extra meta required
- Must include
-
ttl: As given by
return_options.ttl
, orNone
(permanent) if not set.
-
-
No
.complete
frame.
-
-
topic:
"{command_name}{suffix}"
(e.g.,echo.response
, or custom suffix) -
hash: CAS hash of
serde_json::to_string(&collected_values)
-
meta:
{ "command_id": "...", "frame_id": "..." }
-
ttl: as described above.
- On error, emit an
<name>.error
frame as before (no change).
// After running pipeline, instead of streaming values:
// 1. Collect the pipeline fully (in blocking or async thread):
let values: Vec<Value> = pipeline_data.into_iter().collect();
// 2. Serialize as JSON (as a List/Array):
let json_list = serde_json::to_string(&values).unwrap();
let hash = store.cas_insert_sync(json_list)?;
// 3. Build the .response frame:
let response_topic = format!(
"{}{}",
frame.topic.strip_suffix(".call").unwrap(),
command.return_options.as_ref()
.and_then(|opts| opts.suffix.as_deref())
.unwrap_or(".response")
);
let _ = store.append(
Frame::builder(response_topic, frame.context_id)
.maybe_ttl(command.return_options.as_ref().and_then(|opts| opts.ttl.clone()))
.hash(hash)
.meta(json!({
"command_id": command.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.build()
);
// 4. Do not emit .complete.
Given a command:
{
run: {|frame|
[1, 2, 3] | each {|x| $x * 2 }
},
return_options: { suffix: ".response", ttl: "time:1000" }
}
When you call it, Then it emits one frame:
- topic:
cmd.response
- hash: points to
"[2,4,6]"
(as a JSON array) - meta: contains
command_id
andframe_id
- ttl: 1000 ms
- If the command returns nothing, the JSON should be
[]
. - If the pipeline throws, error reporting is unchanged.
- This does not affect direct
.append
calls within the user’s script; only the main command output changes.
Behavior | Before | After |
---|---|---|
Output Event | N x .recv , 1 x .complete |
1 x .response (default suffix, overridable) |
Output Value(s) | One per .recv |
All as array in .response |
return_options.suffix |
Applied to .recv |
Applied to .response |
return_options.ttl |
Applied to .recv |
Applied to .response |
.complete frame |
Emitted | Not emitted |
Error Handling | .error as before |
.error as before |
- Remove
.recv
streaming logic - Remove
.complete
emission - After pipeline run, collect all output as
Vec<Value>
- Serialize as JSON array, put in CAS
- Emit one frame with topic
<name>{suffix or ".response"}
using meta and ttl from return_options - Tests updated to expect single
.response
frame with array content