Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created April 1, 2026 08:26
Show Gist options
  • Select an option

  • Save kausmeows/8890ea1c36d73530b6f1b816c4dca161 to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/8890ea1c36d73530b6f1b816c4dca161 to your computer and use it in GitHub Desktop.
"""
Executor HITL via /continue API (Streaming)
============================================
Tests executor-level HITL when an agent inside a Step has a tool
with requires_confirmation=True, using the AgentOS /continue API endpoint
with streaming enabled.
Flow:
gather_data -> detailed_analysis (agent with HITL tool) -> report
Usage:
python libs/agno/agno/test.py
"""
import json
from fastapi.testclient import TestClient
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.tools import tool
from agno.workflow.condition import Condition
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
@tool(requires_confirmation=True)
def run_detailed_analysis(topic: str) -> str:
"""Run a detailed analysis on the given topic. This is an expensive operation.
Args:
topic: The topic to analyze in detail.
"""
return (
f"Detailed analysis for '{topic}':\n"
"- Comprehensive data review completed\n"
"- All edge cases examined\n"
"- 47 data points processed"
)
db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
analysis_agent = Agent(
name="AnalysisAgent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[run_detailed_analysis],
instructions=(
"You perform detailed data analysis. "
"Always use the run_detailed_analysis tool with the user's topic."
),
db=db,
telemetry=False,
)
def gather_data(step_input: StepInput) -> StepOutput:
topic = step_input.input or "general data"
return StepOutput(content=f"Data gathered for: {topic}")
def quick_summary(step_input: StepInput) -> StepOutput:
return StepOutput(content="Quick summary: basic metrics computed in 1 minute")
def generate_report(step_input: StepInput) -> StepOutput:
prev = step_input.previous_step_content or "No analysis"
return StepOutput(content=f"=== FINAL REPORT ===\n\n{prev}\n\nReport complete.")
workflow = Workflow(
name="ConditionExecutorHITL",
id="condition-executor-hitl",
db=db,
steps=[
Step(name="gather_data", executor=gather_data),
Step(name="detailed_analysis", agent=analysis_agent),
# Condition(
# name="analysis_decision",
# evaluator=True,
# steps=[Step(name="detailed_analysis", agent=analysis_agent)],
# else_steps=[Step(name="quick_summary", executor=quick_summary)],
# ),
Step(name="report", executor=generate_report),
],
telemetry=False,
)
# Create AgentOS and test client
agent_os = AgentOS(workflows=[workflow])
app = agent_os.get_app()
client = TestClient(app)
def parse_sse_events(response):
"""Parse SSE events from a streaming response."""
events = []
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode("utf-8")
if line.startswith("data: "):
try:
events.append(json.loads(line[6:]))
except json.JSONDecodeError:
pass
return events
if __name__ == "__main__":
# Step 1: Create a run via API (streaming) - should pause at executor HITL
print("--- Step 1: Creating workflow run via API (streaming, expects executor pause) ---")
with client.stream(
"POST",
"/workflows/condition-executor-hitl/runs",
data={"message": "Q4 sales performance", "stream": "true"},
) as response:
print(f"Response status code: {response.status_code}")
events = parse_sse_events(response)
print(f"Received {len(events)} SSE events")
for i, event in enumerate(events):
print(f" Event {i}: {event.get('event', 'unknown')}")
# Find the StepExecutorPaused event
paused_event = None
for event in events:
if event.get("event") == "StepExecutorPaused":
paused_event = event
break
if paused_event is None:
print("WARNING: No StepExecutorPaused event found!")
print(f"Last event: {json.dumps(events[-1] if events else {}, indent=2)}")
else:
run_id = paused_event.get("run_id")
session_id = paused_event.get("session_id")
print(f"Run ID: {run_id}")
print(f"Session ID: {session_id}")
print(f"Step: {paused_event.get('step_name')} (index {paused_event.get('step_index')})")
print(f"Executor: {paused_event.get('executor_agent_name')} ({paused_event.get('executor_type')})")
print("Workflow paused as expected!")
# Step 2: Build step_requirements from the paused event and confirm
print("\n--- Step 2: Processing executor requirements ---")
executor_reqs = paused_event.get("executor_requirements", [])
print(f"Executor requirements: {len(executor_reqs)}")
for ereq in executor_reqs:
tool_exec = ereq.get("tool_execution", {})
if tool_exec:
print(f" Tool: {tool_exec.get('tool_name')}({tool_exec.get('tool_args')})")
# Confirm the tool execution
tool_exec["confirmed"] = True
tool_exec["confirmation_note"] = "Approved via API test"
# Reconstruct step_requirements in the format the /continue endpoint expects
step_requirements = [
{
"step_name": paused_event.get("step_name"),
"step_index": paused_event.get("step_index"),
"step_id": paused_event.get("step_id"),
"requires_executor_input": True,
"executor_agent_id": paused_event.get("executor_agent_id"),
"executor_agent_name": paused_event.get("executor_agent_name"),
"executor_run_id": paused_event.get("executor_run_id"),
"executor_type": paused_event.get("executor_type"),
"executor_requirements": executor_reqs,
}
]
# Step 3: Continue the run via /continue API (streaming)
print("\n--- Step 3: Continuing run via /continue API (streaming) ---")
with client.stream(
"POST",
f"/workflows/condition-executor-hitl/runs/{run_id}/continue",
data={
"step_requirements": json.dumps(step_requirements),
"session_id": session_id,
"stream": "true",
},
) as continue_response:
print(f"Continue response status code: {continue_response.status_code}")
continue_events = parse_sse_events(continue_response)
print(f"Received {len(continue_events)} SSE events")
for i, event in enumerate(continue_events):
print(f" Event {i}: {event.get('event', 'unknown')}")
# Find the WorkflowCompleted event
completed_event = None
for event in continue_events:
if event.get("event") == "WorkflowCompleted":
completed_event = event
break
if completed_event:
print(f"\nFinal content: {str(completed_event.get('content', ''))[:300]}")
print("\n--- SUCCESS: Workflow executor HITL streaming continue via API works! ---")
else:
# Check for errors
error_event = None
for event in continue_events:
if event.get("event") == "WorkflowError":
error_event = event
break
if error_event:
print(f"\nERROR: {error_event.get('error')}")
else:
print("\nNo WorkflowCompleted event found")
if continue_events:
print(f"Last event: {json.dumps(continue_events[-1], indent=2)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment