Created
April 1, 2026 20:45
-
-
Save kausmeows/71f6bd8070a94c2e9d0b93f218a12ce0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Test Team HITL API flow with streaming everywhere. | |
| Tests: | |
| 1. Team run with streaming - pauses when member tool requires confirmation | |
| 2. Continue run with streaming - member agent streams after continue (fix for #7003) | |
| 3. Events are stored in DB (store_events=True auto-enabled by router) | |
| 4. Member responses are stored (store_member_responses=True auto-enabled by router) | |
| """ | |
| import json | |
| from fastapi.testclient import TestClient | |
| from agno.agent import Agent | |
| from agno.db.postgres import PostgresDb | |
| from agno.models.openai import OpenAIChat | |
| from agno.os import AgentOS | |
| from agno.team.team import Team | |
| from agno.tools import tool | |
| @tool(requires_confirmation=True) | |
| def deploy_to_production(app_name: str, version: str) -> str: | |
| """Deploy an application to production.""" | |
| return f"Successfully deployed {app_name} v{version} to production" | |
| # Setup DB and Team | |
| db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai") | |
| deploy_agent = Agent( | |
| name="Deploy Agent", | |
| model=OpenAIChat(id="gpt-4o"), | |
| tools=[deploy_to_production], | |
| ) | |
| team = Team( | |
| id="devops-team", | |
| name="DevOps Team", | |
| members=[deploy_agent], | |
| model=OpenAIChat(id="gpt-4o"), | |
| db=db, | |
| # Note: store_member_responses and store_events are auto-enabled by the router for API flows | |
| ) | |
| # Create AgentOS and test client | |
| agent_os = AgentOS(teams=[team]) | |
| app = agent_os.get_app() | |
| client = TestClient(app) | |
| if __name__ == "__main__": | |
| print("Testing Team HITL API (fully streaming):") | |
| print("- store_events auto-enabled in router") | |
| print("- store_member_responses auto-enabled in router") | |
| print("- stream=True propagated to member acontinue_run (#7003)") | |
| print("=" * 60) | |
| # Step 1: Create run with streaming - should pause | |
| print("\n--- Step 1: Creating team run (streaming) ---") | |
| run_events = [] | |
| paused_data = None | |
| with client.stream( | |
| "POST", | |
| "/teams/devops-team/runs", | |
| data={"message": "Deploy the payments app version 2.1 to production", "stream": "true", "store_events": "true"}, | |
| ) as stream_response: | |
| print(f"Stream response status: {stream_response.status_code}") | |
| for line in stream_response.iter_lines(): | |
| if line.startswith("data: "): | |
| try: | |
| event_data = json.loads(line[6:]) | |
| event_type = event_data.get("event") | |
| run_events.append(event_type) | |
| print(f" Event: {event_type}") | |
| # Capture the paused event data | |
| if event_type == "TeamRunPaused": | |
| paused_data = event_data | |
| except json.JSONDecodeError: | |
| pass | |
| print(f"\n--- Run events received: {len(run_events)} ---") | |
| if paused_data is None: | |
| print("FAIL: Did not receive TeamRunPaused event") | |
| exit(1) | |
| print("PASS: Team paused with streaming!") | |
| run_id = paused_data.get("run_id") | |
| session_id = paused_data.get("session_id") | |
| requirements = paused_data.get("requirements", []) | |
| print(f"Run ID: {run_id}") | |
| print(f"Session ID: {session_id}") | |
| print(f"Requirements: {len(requirements)}") | |
| # Step 2: Confirm requirements | |
| print("\n--- Step 2: Confirming requirements ---") | |
| for req in requirements: | |
| if req.get("tool_execution"): | |
| tool_exec = req["tool_execution"] | |
| print(f" Confirming: {tool_exec.get('tool_name')}({tool_exec.get('tool_args')})") | |
| tool_exec["confirmed"] = True | |
| tool_exec["confirmation_note"] = "Approved via streaming test" | |
| # Step 3: Continue run with streaming | |
| print("\n--- Step 3: Continuing run (streaming) ---") | |
| continue_events = [] | |
| final_data = None | |
| with client.stream( | |
| "POST", | |
| f"/teams/devops-team/runs/{run_id}/continue", | |
| data={ | |
| "requirements": json.dumps(requirements), | |
| "session_id": session_id, | |
| "stream": "true", | |
| }, | |
| ) as stream_response: | |
| print(f"Stream response status: {stream_response.status_code}") | |
| for line in stream_response.iter_lines(): | |
| if line.startswith("data: "): | |
| try: | |
| event_data = json.loads(line[6:]) | |
| event_type = event_data.get("event") | |
| continue_events.append(event_type) | |
| print(f" Event: {event_type}") | |
| # Capture final event | |
| if event_type in ("TeamRunCompleted", "TeamRunError"): | |
| final_data = event_data | |
| except json.JSONDecodeError: | |
| pass | |
| print(f"\n--- Continue events received: {len(continue_events)} ---") | |
| # Summary | |
| print("\n" + "=" * 60) | |
| print("SUMMARY") | |
| print("=" * 60) | |
| print(f"Run events: {len(run_events)}") | |
| print(f"Continue events: {len(continue_events)}") | |
| print(f"Total events streamed: {len(run_events) + len(continue_events)}") | |
| if "TeamRunPaused" in run_events: | |
| print("PASS: Run paused correctly") | |
| else: | |
| print("FAIL: Run did not pause") | |
| if "TeamRunContinued" in continue_events: | |
| print("PASS: Continue event received") | |
| else: | |
| print("WARN: TeamRunContinued not in events") | |
| if "TeamRunCompleted" in continue_events: | |
| print("PASS: Run completed successfully") | |
| elif "TeamRunError" in continue_events: | |
| print("WARN: Run completed with error (may be TestClient event loop issue)") | |
| else: | |
| print("WARN: No completion event (may be TestClient event loop issue)") | |
| # Check for content events (member streaming after continue - #7003 fix) | |
| content_events = [e for e in continue_events if e and "Content" in e] | |
| print(f"\nContent events after continue: {len(content_events)}") | |
| if content_events: | |
| print("PASS: Member agent streamed content after continue (#7003 fix working)") | |
| else: | |
| print("WARN: No content events (may need real server to test streaming)") | |
| print("\n--- DONE ---") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment