Created
April 1, 2026 08:26
-
-
Save kausmeows/8890ea1c36d73530b6f1b816c4dca161 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Executor HITL via /continue API (Streaming) | |
| ============================================ | |
| Tests executor-level HITL when an agent inside a Step has a tool | |
| with requires_confirmation=True, using the AgentOS /continue API endpoint | |
| with streaming enabled. | |
| Flow: | |
| gather_data -> detailed_analysis (agent with HITL tool) -> report | |
| Usage: | |
| python libs/agno/agno/test.py | |
| """ | |
| import json | |
| from fastapi.testclient import TestClient | |
| from agno.agent import Agent | |
| from agno.db.postgres import PostgresDb | |
| from agno.models.openai import OpenAIChat | |
| from agno.os import AgentOS | |
| from agno.tools import tool | |
| from agno.workflow.condition import Condition | |
| from agno.workflow.step import Step | |
| from agno.workflow.types import StepInput, StepOutput | |
| from agno.workflow.workflow import Workflow | |
| @tool(requires_confirmation=True) | |
| def run_detailed_analysis(topic: str) -> str: | |
| """Run a detailed analysis on the given topic. This is an expensive operation. | |
| Args: | |
| topic: The topic to analyze in detail. | |
| """ | |
| return ( | |
| f"Detailed analysis for '{topic}':\n" | |
| "- Comprehensive data review completed\n" | |
| "- All edge cases examined\n" | |
| "- 47 data points processed" | |
| ) | |
| db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai") | |
| analysis_agent = Agent( | |
| name="AnalysisAgent", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| tools=[run_detailed_analysis], | |
| instructions=( | |
| "You perform detailed data analysis. " | |
| "Always use the run_detailed_analysis tool with the user's topic." | |
| ), | |
| db=db, | |
| telemetry=False, | |
| ) | |
| def gather_data(step_input: StepInput) -> StepOutput: | |
| topic = step_input.input or "general data" | |
| return StepOutput(content=f"Data gathered for: {topic}") | |
| def quick_summary(step_input: StepInput) -> StepOutput: | |
| return StepOutput(content="Quick summary: basic metrics computed in 1 minute") | |
| def generate_report(step_input: StepInput) -> StepOutput: | |
| prev = step_input.previous_step_content or "No analysis" | |
| return StepOutput(content=f"=== FINAL REPORT ===\n\n{prev}\n\nReport complete.") | |
| workflow = Workflow( | |
| name="ConditionExecutorHITL", | |
| id="condition-executor-hitl", | |
| db=db, | |
| steps=[ | |
| Step(name="gather_data", executor=gather_data), | |
| Step(name="detailed_analysis", agent=analysis_agent), | |
| # Condition( | |
| # name="analysis_decision", | |
| # evaluator=True, | |
| # steps=[Step(name="detailed_analysis", agent=analysis_agent)], | |
| # else_steps=[Step(name="quick_summary", executor=quick_summary)], | |
| # ), | |
| Step(name="report", executor=generate_report), | |
| ], | |
| telemetry=False, | |
| ) | |
| # Create AgentOS and test client | |
| agent_os = AgentOS(workflows=[workflow]) | |
| app = agent_os.get_app() | |
| client = TestClient(app) | |
| def parse_sse_events(response): | |
| """Parse SSE events from a streaming response.""" | |
| events = [] | |
| for line in response.iter_lines(): | |
| if isinstance(line, bytes): | |
| line = line.decode("utf-8") | |
| if line.startswith("data: "): | |
| try: | |
| events.append(json.loads(line[6:])) | |
| except json.JSONDecodeError: | |
| pass | |
| return events | |
| if __name__ == "__main__": | |
| # Step 1: Create a run via API (streaming) - should pause at executor HITL | |
| print("--- Step 1: Creating workflow run via API (streaming, expects executor pause) ---") | |
| with client.stream( | |
| "POST", | |
| "/workflows/condition-executor-hitl/runs", | |
| data={"message": "Q4 sales performance", "stream": "true"}, | |
| ) as response: | |
| print(f"Response status code: {response.status_code}") | |
| events = parse_sse_events(response) | |
| print(f"Received {len(events)} SSE events") | |
| for i, event in enumerate(events): | |
| print(f" Event {i}: {event.get('event', 'unknown')}") | |
| # Find the StepExecutorPaused event | |
| paused_event = None | |
| for event in events: | |
| if event.get("event") == "StepExecutorPaused": | |
| paused_event = event | |
| break | |
| if paused_event is None: | |
| print("WARNING: No StepExecutorPaused event found!") | |
| print(f"Last event: {json.dumps(events[-1] if events else {}, indent=2)}") | |
| else: | |
| run_id = paused_event.get("run_id") | |
| session_id = paused_event.get("session_id") | |
| print(f"Run ID: {run_id}") | |
| print(f"Session ID: {session_id}") | |
| print(f"Step: {paused_event.get('step_name')} (index {paused_event.get('step_index')})") | |
| print(f"Executor: {paused_event.get('executor_agent_name')} ({paused_event.get('executor_type')})") | |
| print("Workflow paused as expected!") | |
| # Step 2: Build step_requirements from the paused event and confirm | |
| print("\n--- Step 2: Processing executor requirements ---") | |
| executor_reqs = paused_event.get("executor_requirements", []) | |
| print(f"Executor requirements: {len(executor_reqs)}") | |
| for ereq in executor_reqs: | |
| tool_exec = ereq.get("tool_execution", {}) | |
| if tool_exec: | |
| print(f" Tool: {tool_exec.get('tool_name')}({tool_exec.get('tool_args')})") | |
| # Confirm the tool execution | |
| tool_exec["confirmed"] = True | |
| tool_exec["confirmation_note"] = "Approved via API test" | |
| # Reconstruct step_requirements in the format the /continue endpoint expects | |
| step_requirements = [ | |
| { | |
| "step_name": paused_event.get("step_name"), | |
| "step_index": paused_event.get("step_index"), | |
| "step_id": paused_event.get("step_id"), | |
| "requires_executor_input": True, | |
| "executor_agent_id": paused_event.get("executor_agent_id"), | |
| "executor_agent_name": paused_event.get("executor_agent_name"), | |
| "executor_run_id": paused_event.get("executor_run_id"), | |
| "executor_type": paused_event.get("executor_type"), | |
| "executor_requirements": executor_reqs, | |
| } | |
| ] | |
| # Step 3: Continue the run via /continue API (streaming) | |
| print("\n--- Step 3: Continuing run via /continue API (streaming) ---") | |
| with client.stream( | |
| "POST", | |
| f"/workflows/condition-executor-hitl/runs/{run_id}/continue", | |
| data={ | |
| "step_requirements": json.dumps(step_requirements), | |
| "session_id": session_id, | |
| "stream": "true", | |
| }, | |
| ) as continue_response: | |
| print(f"Continue response status code: {continue_response.status_code}") | |
| continue_events = parse_sse_events(continue_response) | |
| print(f"Received {len(continue_events)} SSE events") | |
| for i, event in enumerate(continue_events): | |
| print(f" Event {i}: {event.get('event', 'unknown')}") | |
| # Find the WorkflowCompleted event | |
| completed_event = None | |
| for event in continue_events: | |
| if event.get("event") == "WorkflowCompleted": | |
| completed_event = event | |
| break | |
| if completed_event: | |
| print(f"\nFinal content: {str(completed_event.get('content', ''))[:300]}") | |
| print("\n--- SUCCESS: Workflow executor HITL streaming continue via API works! ---") | |
| else: | |
| # Check for errors | |
| error_event = None | |
| for event in continue_events: | |
| if event.get("event") == "WorkflowError": | |
| error_event = event | |
| break | |
| if error_event: | |
| print(f"\nERROR: {error_event.get('error')}") | |
| else: | |
| print("\nNo WorkflowCompleted event found") | |
| if continue_events: | |
| print(f"Last event: {json.dumps(continue_events[-1], indent=2)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment