Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created April 1, 2026 20:45
Show Gist options
  • Select an option

  • Save kausmeows/71f6bd8070a94c2e9d0b93f218a12ce0 to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/71f6bd8070a94c2e9d0b93f218a12ce0 to your computer and use it in GitHub Desktop.
"""Test Team HITL API flow with streaming everywhere.
Tests:
1. Team run with streaming - pauses when member tool requires confirmation
2. Continue run with streaming - member agent streams after continue (fix for #7003)
3. Events are stored in DB (store_events=True auto-enabled by router)
4. Member responses are stored (store_member_responses=True auto-enabled by router)
"""
import json
from fastapi.testclient import TestClient
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.team.team import Team
from agno.tools import tool
@tool(requires_confirmation=True)
def deploy_to_production(app_name: str, version: str) -> str:
"""Deploy an application to production."""
return f"Successfully deployed {app_name} v{version} to production"
# Setup DB and Team
db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
deploy_agent = Agent(
name="Deploy Agent",
model=OpenAIChat(id="gpt-4o"),
tools=[deploy_to_production],
)
team = Team(
id="devops-team",
name="DevOps Team",
members=[deploy_agent],
model=OpenAIChat(id="gpt-4o"),
db=db,
# Note: store_member_responses and store_events are auto-enabled by the router for API flows
)
# Create AgentOS and test client
agent_os = AgentOS(teams=[team])
app = agent_os.get_app()
client = TestClient(app)
if __name__ == "__main__":
print("Testing Team HITL API (fully streaming):")
print("- store_events auto-enabled in router")
print("- store_member_responses auto-enabled in router")
print("- stream=True propagated to member acontinue_run (#7003)")
print("=" * 60)
# Step 1: Create run with streaming - should pause
print("\n--- Step 1: Creating team run (streaming) ---")
run_events = []
paused_data = None
with client.stream(
"POST",
"/teams/devops-team/runs",
data={"message": "Deploy the payments app version 2.1 to production", "stream": "true", "store_events": "true"},
) as stream_response:
print(f"Stream response status: {stream_response.status_code}")
for line in stream_response.iter_lines():
if line.startswith("data: "):
try:
event_data = json.loads(line[6:])
event_type = event_data.get("event")
run_events.append(event_type)
print(f" Event: {event_type}")
# Capture the paused event data
if event_type == "TeamRunPaused":
paused_data = event_data
except json.JSONDecodeError:
pass
print(f"\n--- Run events received: {len(run_events)} ---")
if paused_data is None:
print("FAIL: Did not receive TeamRunPaused event")
exit(1)
print("PASS: Team paused with streaming!")
run_id = paused_data.get("run_id")
session_id = paused_data.get("session_id")
requirements = paused_data.get("requirements", [])
print(f"Run ID: {run_id}")
print(f"Session ID: {session_id}")
print(f"Requirements: {len(requirements)}")
# Step 2: Confirm requirements
print("\n--- Step 2: Confirming requirements ---")
for req in requirements:
if req.get("tool_execution"):
tool_exec = req["tool_execution"]
print(f" Confirming: {tool_exec.get('tool_name')}({tool_exec.get('tool_args')})")
tool_exec["confirmed"] = True
tool_exec["confirmation_note"] = "Approved via streaming test"
# Step 3: Continue run with streaming
print("\n--- Step 3: Continuing run (streaming) ---")
continue_events = []
final_data = None
with client.stream(
"POST",
f"/teams/devops-team/runs/{run_id}/continue",
data={
"requirements": json.dumps(requirements),
"session_id": session_id,
"stream": "true",
},
) as stream_response:
print(f"Stream response status: {stream_response.status_code}")
for line in stream_response.iter_lines():
if line.startswith("data: "):
try:
event_data = json.loads(line[6:])
event_type = event_data.get("event")
continue_events.append(event_type)
print(f" Event: {event_type}")
# Capture final event
if event_type in ("TeamRunCompleted", "TeamRunError"):
final_data = event_data
except json.JSONDecodeError:
pass
print(f"\n--- Continue events received: {len(continue_events)} ---")
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Run events: {len(run_events)}")
print(f"Continue events: {len(continue_events)}")
print(f"Total events streamed: {len(run_events) + len(continue_events)}")
if "TeamRunPaused" in run_events:
print("PASS: Run paused correctly")
else:
print("FAIL: Run did not pause")
if "TeamRunContinued" in continue_events:
print("PASS: Continue event received")
else:
print("WARN: TeamRunContinued not in events")
if "TeamRunCompleted" in continue_events:
print("PASS: Run completed successfully")
elif "TeamRunError" in continue_events:
print("WARN: Run completed with error (may be TestClient event loop issue)")
else:
print("WARN: No completion event (may be TestClient event loop issue)")
# Check for content events (member streaming after continue - #7003 fix)
content_events = [e for e in continue_events if e and "Content" in e]
print(f"\nContent events after continue: {len(content_events)}")
if content_events:
print("PASS: Member agent streamed content after continue (#7003 fix working)")
else:
print("WARN: No content events (may need real server to test streaming)")
print("\n--- DONE ---")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment