Created
April 1, 2026 08:15
-
-
Save kausmeows/0395a8deb7d8d0d05ff74df73b8b970e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Condition with Executor HITL via /continue API | |
| ================================================ | |
| Tests executor-level HITL when an agent inside a Condition step has a tool | |
| with requires_confirmation=True, using the AgentOS /continue API endpoint. | |
| Flow: | |
| gather_data -> Condition(evaluator=True) -> report | |
| | | |
| v | |
| detailed_analysis (agent with HITL tool) | |
| Usage: | |
| python libs/agno/agno/test.py | |
| """ | |
| import json | |
| from fastapi.testclient import TestClient | |
| from agno.agent import Agent | |
| from agno.db.postgres import PostgresDb | |
| from agno.models.openai import OpenAIChat | |
| from agno.os import AgentOS | |
| from agno.tools import tool | |
| from agno.workflow.condition import Condition | |
| from agno.workflow.step import Step | |
| from agno.workflow.types import StepInput, StepOutput | |
| from agno.workflow.workflow import Workflow | |
| @tool(requires_confirmation=True) | |
| def run_detailed_analysis(topic: str) -> str: | |
| """Run a detailed analysis on the given topic. This is an expensive operation. | |
| Args: | |
| topic: The topic to analyze in detail. | |
| """ | |
| return ( | |
| f"Detailed analysis for '{topic}':\n" | |
| "- Comprehensive data review completed\n" | |
| "- All edge cases examined\n" | |
| "- 47 data points processed" | |
| ) | |
| db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai") | |
| analysis_agent = Agent( | |
| name="AnalysisAgent", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| tools=[run_detailed_analysis], | |
| instructions=( | |
| "You perform detailed data analysis. " | |
| "Always use the run_detailed_analysis tool with the user's topic." | |
| ), | |
| db=db, | |
| telemetry=False, | |
| ) | |
| def gather_data(step_input: StepInput) -> StepOutput: | |
| topic = step_input.input or "general data" | |
| return StepOutput(content=f"Data gathered for: {topic}") | |
| def quick_summary(step_input: StepInput) -> StepOutput: | |
| return StepOutput(content="Quick summary: basic metrics computed in 1 minute") | |
| def generate_report(step_input: StepInput) -> StepOutput: | |
| prev = step_input.previous_step_content or "No analysis" | |
| return StepOutput(content=f"=== FINAL REPORT ===\n\n{prev}\n\nReport complete.") | |
| workflow = Workflow( | |
| name="ConditionExecutorHITL", | |
| id="condition-executor-hitl", | |
| db=db, | |
| steps=[ | |
| Step(name="gather_data", executor=gather_data), | |
| Step(name="detailed_analysis", agent=analysis_agent), | |
| # Condition( | |
| # name="analysis_decision", | |
| # evaluator=True, | |
| # steps=[Step(name="detailed_analysis", agent=analysis_agent)], | |
| # else_steps=[Step(name="quick_summary", executor=quick_summary)], | |
| # ), | |
| Step(name="report", executor=generate_report), | |
| ], | |
| telemetry=False, | |
| ) | |
| # Create AgentOS and test client | |
| agent_os = AgentOS(workflows=[workflow]) | |
| app = agent_os.get_app() | |
| client = TestClient(app) | |
| if __name__ == "__main__": | |
| # Step 1: Create a run via API - should pause at executor HITL | |
| print("--- Step 1: Creating workflow run via API (expects executor pause) ---") | |
| response = client.post( | |
| "/workflows/condition-executor-hitl/runs", | |
| data={"message": "Q4 sales performance", "stream": "false"}, | |
| ) | |
| print(f"Response status code: {response.status_code}") | |
| run_data = response.json() | |
| run_id = run_data.get("run_id") | |
| session_id = run_data.get("session_id") | |
| status = run_data.get("status") | |
| print(f"Run ID: {run_id}") | |
| print(f"Session ID: {session_id}") | |
| print(f"Status: {status}") | |
| is_paused = status == "PAUSED" | |
| print(f"Is paused: {is_paused}") | |
| if not is_paused: | |
| print("WARNING: Run did not pause as expected!") | |
| print(f"Full response: {json.dumps(run_data, indent=2)}") | |
| else: | |
| print("Workflow paused as expected!") | |
| # Step 2: Get the step_requirements and confirm them | |
| print("\n--- Step 2: Processing step requirements ---") | |
| step_requirements = run_data.get("step_requirements", []) | |
| print(f"Found {len(step_requirements)} step requirement(s)") | |
| for req in step_requirements: | |
| if req.get("requires_executor_input"): | |
| print(f" Executor: {req.get('executor_agent_name')} ({req.get('executor_type')})") | |
| executor_reqs = req.get("executor_requirements", []) | |
| print(f" Executor requirements: {len(executor_reqs)}") | |
| for ereq in executor_reqs: | |
| tool_exec = ereq.get("tool_execution", {}) | |
| if tool_exec: | |
| print(f" Tool: {tool_exec.get('tool_name')}({tool_exec.get('tool_args')})") | |
| # Confirm the tool execution | |
| tool_exec["confirmed"] = True | |
| tool_exec["confirmation_note"] = "Approved via API test" | |
| # Step 3: Continue the run via /continue API | |
| print("\n--- Step 3: Continuing run via /continue API ---") | |
| continue_response = client.post( | |
| f"/workflows/condition-executor-hitl/runs/{run_id}/continue", | |
| data={ | |
| "step_requirements": json.dumps(step_requirements), | |
| "session_id": session_id, | |
| "stream": "false", | |
| }, | |
| ) | |
| print(f"Continue response status code: {continue_response.status_code}") | |
| if continue_response.status_code == 200: | |
| continue_data = continue_response.json() | |
| print(f"Final status: {continue_data.get('status')}") | |
| print(f"Content: {str(continue_data.get('content', ''))[:300]}") | |
| print("\n--- SUCCESS: Workflow executor HITL continue via API works! ---") | |
| else: | |
| print(f"Continue failed: {continue_response.text}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment