Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created April 1, 2026 08:15
Show Gist options
  • Select an option

  • Save kausmeows/0395a8deb7d8d0d05ff74df73b8b970e to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/0395a8deb7d8d0d05ff74df73b8b970e to your computer and use it in GitHub Desktop.
"""
Condition with Executor HITL via /continue API
================================================
Tests executor-level HITL when an agent inside a Condition step has a tool
with requires_confirmation=True, using the AgentOS /continue API endpoint.
Flow:
gather_data -> Condition(evaluator=True) -> report
|
v
detailed_analysis (agent with HITL tool)
Usage:
python libs/agno/agno/test.py
"""
import json
from fastapi.testclient import TestClient
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.tools import tool
from agno.workflow.condition import Condition
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
@tool(requires_confirmation=True)
def run_detailed_analysis(topic: str) -> str:
"""Run a detailed analysis on the given topic. This is an expensive operation.
Args:
topic: The topic to analyze in detail.
"""
return (
f"Detailed analysis for '{topic}':\n"
"- Comprehensive data review completed\n"
"- All edge cases examined\n"
"- 47 data points processed"
)
db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
analysis_agent = Agent(
name="AnalysisAgent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[run_detailed_analysis],
instructions=(
"You perform detailed data analysis. "
"Always use the run_detailed_analysis tool with the user's topic."
),
db=db,
telemetry=False,
)
def gather_data(step_input: StepInput) -> StepOutput:
topic = step_input.input or "general data"
return StepOutput(content=f"Data gathered for: {topic}")
def quick_summary(step_input: StepInput) -> StepOutput:
return StepOutput(content="Quick summary: basic metrics computed in 1 minute")
def generate_report(step_input: StepInput) -> StepOutput:
prev = step_input.previous_step_content or "No analysis"
return StepOutput(content=f"=== FINAL REPORT ===\n\n{prev}\n\nReport complete.")
workflow = Workflow(
name="ConditionExecutorHITL",
id="condition-executor-hitl",
db=db,
steps=[
Step(name="gather_data", executor=gather_data),
Step(name="detailed_analysis", agent=analysis_agent),
# Condition(
# name="analysis_decision",
# evaluator=True,
# steps=[Step(name="detailed_analysis", agent=analysis_agent)],
# else_steps=[Step(name="quick_summary", executor=quick_summary)],
# ),
Step(name="report", executor=generate_report),
],
telemetry=False,
)
# Create AgentOS and test client
agent_os = AgentOS(workflows=[workflow])
app = agent_os.get_app()
client = TestClient(app)
if __name__ == "__main__":
# Step 1: Create a run via API - should pause at executor HITL
print("--- Step 1: Creating workflow run via API (expects executor pause) ---")
response = client.post(
"/workflows/condition-executor-hitl/runs",
data={"message": "Q4 sales performance", "stream": "false"},
)
print(f"Response status code: {response.status_code}")
run_data = response.json()
run_id = run_data.get("run_id")
session_id = run_data.get("session_id")
status = run_data.get("status")
print(f"Run ID: {run_id}")
print(f"Session ID: {session_id}")
print(f"Status: {status}")
is_paused = status == "PAUSED"
print(f"Is paused: {is_paused}")
if not is_paused:
print("WARNING: Run did not pause as expected!")
print(f"Full response: {json.dumps(run_data, indent=2)}")
else:
print("Workflow paused as expected!")
# Step 2: Get the step_requirements and confirm them
print("\n--- Step 2: Processing step requirements ---")
step_requirements = run_data.get("step_requirements", [])
print(f"Found {len(step_requirements)} step requirement(s)")
for req in step_requirements:
if req.get("requires_executor_input"):
print(f" Executor: {req.get('executor_agent_name')} ({req.get('executor_type')})")
executor_reqs = req.get("executor_requirements", [])
print(f" Executor requirements: {len(executor_reqs)}")
for ereq in executor_reqs:
tool_exec = ereq.get("tool_execution", {})
if tool_exec:
print(f" Tool: {tool_exec.get('tool_name')}({tool_exec.get('tool_args')})")
# Confirm the tool execution
tool_exec["confirmed"] = True
tool_exec["confirmation_note"] = "Approved via API test"
# Step 3: Continue the run via /continue API
print("\n--- Step 3: Continuing run via /continue API ---")
continue_response = client.post(
f"/workflows/condition-executor-hitl/runs/{run_id}/continue",
data={
"step_requirements": json.dumps(step_requirements),
"session_id": session_id,
"stream": "false",
},
)
print(f"Continue response status code: {continue_response.status_code}")
if continue_response.status_code == 200:
continue_data = continue_response.json()
print(f"Final status: {continue_data.get('status')}")
print(f"Content: {str(continue_data.get('content', ''))[:300]}")
print("\n--- SUCCESS: Workflow executor HITL continue via API works! ---")
else:
print(f"Continue failed: {continue_response.text}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment