Based on my comprehensive exploration of the codebase, here's how this SDK handles multi-agent systems:
The SDK supports several powerful composition patterns:
- Agents can transfer control to other agents through the
handoffs
mechanism - Handoffs appear as tools to the LLM, allowing dynamic routing based on context
- Example: A triage agent routing to language-specific agents (src/agents/handoffs.py:58-268)
- Handoffs can include input filters to transform conversation history before passing control
- Any agent can be exposed as a tool to other agents via
agent.as_tool()
- This enables hierarchical agent structures where specialized agents are invoked for specific tasks
- Example: Financial research agent using fundamentals and risk analysis agents as tools (examples/financial_research_agent/manager.py:102-112)
- Multiple agents can run concurrently using
asyncio.gather()
- Results can be aggregated by a coordinator agent
- Example: Running multiple translation agents in parallel and selecting the best result (examples/agent_patterns/parallelization.py:26-39)
- Agents communicate through structured message items (
RunItem
) - Messages preserve full conversation history including tool calls and responses
- Each agent receives the complete context when invoked
- Shared mutable context via
RunContextWrapper[TContext]
(src/agents/run_context.py:11-27) - Context is passed to all tools, guardrails, and hooks
- Enables dependency injection and state sharing across agent components
- Real-time communication through event streams
- Events include agent starts/ends, handoffs, tool calls, and custom events
- Enables reactive UI updates and monitoring
- Agent loop runs until final output is produced (src/agents/run.py:372-498)
- Turn-based execution with configurable max_turns limit
- Each turn: LLM call → Tool execution → Handoff/Continue/Finish
- Tools within a single agent run concurrently by default
- Implemented using
asyncio.gather()
for parallel tool calls - Results are collected and passed back to the LLM
- Non-blocking execution with event streaming (src/agents/run.py:526-594)
- Allows real-time UI updates while agents process
- Maintains same execution semantics as non-streaming mode
- Built-in conversation persistence via
Session
interface - SQLite implementation provided, custom backends supported
- Automatic history management across agent runs
- Agents are stateless by design - state lives in context and messages
- Each agent run creates new execution context
- State persistence handled through sessions and context objects
- Tracks which tools agents have used to prevent infinite loops
- Configurable tool choice reset behavior after tool calls
The SDK provides comprehensive context management capabilities for complex multi-agent workflows:
RunContextWrapper[TContext]
@dataclass
class RunContextWrapper(Generic[TContext]):
context: TContext # Your custom context object
usage: Usage # Tracks LLM usage across the run
- Generic type parameter
TContext
ensures type safety - Context is mutable and passed by reference
- All components receive the same context instance
1. Tool Functions
@function_tool
async def my_tool(context: RunContextWrapper[MyContext], param: str) -> str:
# Access custom context
db = context.context.database_connection
user = context.context.current_user
# Modify context
context.context.processed_items.append(param)
return await db.query(param)
2. Guardrails
class MyGuardrail(InputGuardrail[MyContext]):
async def check(self, agent: Agent, input: str | list,
context: RunContextWrapper[MyContext]) -> GuardrailOutput:
if context.context.user_role != "admin":
return GuardrailOutput(should_block=True)
return GuardrailOutput(should_block=False)
3. Lifecycle Hooks
class MyHooks(AgentHooks[MyContext]):
async def on_start(self, context: RunContextWrapper[MyContext], agent: Agent):
# Initialize agent-specific resources
context.context.agent_resources[agent.name] = ResourcePool()
4. Handoff Filters
def context_transformer(handoff_data: HandoffInputData) -> HandoffInputData:
# Transform context between agents
# Can modify input history, items, etc.
return handoff_data
handoff = handoff(
target_agent,
input_filter=context_transformer
)
1. External State Store Integration
@dataclass
class PersistentContext:
redis_client: Redis
postgres_conn: asyncpg.Connection
state_cache: dict[str, Any]
async def persist(self):
# Save to external stores
await self.redis_client.set("context_state", json.dumps(self.state_cache))
await self.postgres_conn.execute("INSERT INTO context_history ...")
@classmethod
async def restore(cls, session_id: str) -> PersistentContext:
# Restore from external stores
redis = await aioredis.create_redis_pool(...)
postgres = await asyncpg.connect(...)
state = await redis.get(f"context_{session_id}")
return cls(redis, postgres, json.loads(state))
2. Context Checkpointing
class CheckpointingHooks(RunHooks[PersistentContext]):
async def on_agent_end(self, context: RunContextWrapper[PersistentContext],
agent: Agent, result: Any):
# Checkpoint after each agent completes
await context.context.persist()
1. Context Enrichment Pattern
# Each agent enriches context with its results
research_agent = Agent(
name="Researcher",
tools=[
@function_tool
async def save_research(context: RunContextWrapper, data: str):
context.context.research_data.append(data)
return "Saved"
]
)
analysis_agent = Agent(
name="Analyst",
tools=[
@function_tool
async def analyze(context: RunContextWrapper):
# Access enriched context from previous agent
research = context.context.research_data
analysis = perform_analysis(research)
context.context.analysis_results = analysis
return analysis
]
)
2. Context Isolation Between Agents
class IsolatedContext:
def __init__(self):
self.global_state = {} # Shared across all agents
self.agent_state = {} # Isolated per agent
def get_agent_context(self, agent_name: str):
if agent_name not in self.agent_state:
self.agent_state[agent_name] = {}
return self.agent_state[agent_name]
# In handoff filter
def isolate_context(handoff_data: HandoffInputData) -> HandoffInputData:
# Clear sensitive data before handoff
context = handoff_data.context
context.context.agent_state.clear()
return handoff_data
1. Connection Pooling
@dataclass
class ResourceContext:
_db_pool: asyncpg.Pool
_http_session: aiohttp.ClientSession
_api_clients: dict[str, Any]
async def get_db_connection(self) -> asyncpg.Connection:
return await self._db_pool.acquire()
def get_api_client(self, service: str) -> Any:
if service not in self._api_clients:
self._api_clients[service] = create_client(service)
return self._api_clients[service]
async def cleanup(self):
await self._db_pool.close()
await self._http_session.close()
2. Resource Lifecycle Management
class ResourceManager:
@classmethod
@asynccontextmanager
async def create_context(cls) -> ResourceContext:
# Initialize resources
db_pool = await asyncpg.create_pool(...)
http_session = aiohttp.ClientSession()
context = ResourceContext(db_pool, http_session, {})
try:
yield context
finally:
# Cleanup resources
await context.cleanup()
# Usage
async with ResourceManager.create_context() as resource_ctx:
result = await Runner.run(
agent,
input="Process data",
context=resource_ctx
)
1. Thread-Safe Context Updates
import asyncio
from threading import Lock
@dataclass
class ConcurrentContext:
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
_data: dict = field(default_factory=dict)
async def update(self, key: str, value: Any):
async with self._lock:
self._data[key] = value
async def get(self, key: str) -> Any:
async with self._lock:
return self._data.get(key)
2. Context Merging for Parallel Agents
async def run_parallel_agents(base_context: MyContext):
# Create context copies for parallel execution
contexts = [
RunContextWrapper(context=copy.deepcopy(base_context))
for _ in range(3)
]
# Run agents in parallel
results = await asyncio.gather(
Runner.run(agent1, "Task 1", context=contexts[0].context),
Runner.run(agent2, "Task 2", context=contexts[1].context),
Runner.run(agent3, "Task 3", context=contexts[2].context),
)
# Merge contexts back
for ctx in contexts:
base_context.merge_results(ctx.context)
return results
1. Schema Evolution
class ContextAdapter:
@staticmethod
def v1_to_v2(old_context: ContextV1) -> ContextV2:
return ContextV2(
user_id=old_context.user_id,
preferences=old_context.get_preferences(),
# New fields with defaults
feature_flags={},
api_version="v2"
)
# In handoff
handoff = handoff(
new_agent,
input_filter=lambda data: ContextAdapter.v1_to_v2(data.context)
)
2. Domain-Specific Context Views
class MultiDomainContext:
def __init__(self):
self.financial_data = FinancialContext()
self.user_data = UserContext()
self.system_data = SystemContext()
def get_view(self, domain: str):
"""Return domain-specific view of context"""
views = {
"financial": self.financial_data,
"user": self.user_data,
"system": self.system_data
}
return views.get(domain)
# Agents access only their domain
financial_agent = Agent(
name="FinancialAnalyst",
tools=[
@function_tool
async def analyze(context: RunContextWrapper[MultiDomainContext]):
fin_ctx = context.context.get_view("financial")
# Only sees financial data
return fin_ctx.calculate_metrics()
]
)
- Keep Context Serializable: Use dataclasses or Pydantic models for easy persistence
- Minimize Context Size: Store references/IDs rather than full objects when possible
- Use Type Hints: Leverage
Generic[TContext]
for compile-time safety - Handle Cleanup: Implement proper resource cleanup in hooks or context managers
- Version Your Context: Plan for schema evolution from the start
- Isolate Sensitive Data: Use handoff filters to prevent data leakage between agents
- Monitor Context Growth: Track context size to prevent memory issues
- Use Async Locks: Protect shared state in concurrent scenarios
The SDK is designed to be deployed in various ways, from simple API servers to complex distributed systems. Here's how to share your agents with the world:
1. FastAPI Integration (Recommended)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agents import Agent, Runner
# Define your agent
agent = Agent(
name="MyAssistant",
instructions="You are a helpful assistant.",
tools=[...], # Your tools
)
app = FastAPI()
class ChatRequest(BaseModel):
message: str
session_id: str | None = None
class ChatResponse(BaseModel):
response: str
session_id: str
@app.post("/chat")
async def chat(request: ChatRequest) -> ChatResponse:
try:
result = await Runner.run(agent, request.message)
return ChatResponse(
response=result.final_output,
session_id=request.session_id or "default"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Run with: uvicorn main:app --host 0.0.0.0 --port 8000
2. Streaming API Endpoint
from fastapi import FastAPI
from starlette.responses import StreamingResponse
from agents import Agent, Runner, RawResponsesStreamEvent
import json
app = FastAPI()
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
result = Runner.run_streamed(agent, request.message)
async def event_generator():
async for event in result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
yield f"data: {json.dumps({'type': event.type, 'data': str(event.data)})}\n\n"
yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
3. WebSocket Real-time Communication
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
app = FastAPI()
active_sessions = {}
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
# Run agent
result = Runner.run_streamed(agent, message["text"])
# Stream responses
async for event in result.stream_events():
await websocket.send_json({
"type": "stream",
"data": str(event)
})
# Send final result
await websocket.send_json({
"type": "final",
"data": result.final_output
})
except WebSocketDisconnect:
pass
1. Containerized Deployment (Docker)
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./data:/app/data # For SQLite sessions
restart: unless-stopped
2. Kubernetes Deployment
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-api
spec:
replicas: 3
selector:
matchLabels:
app: agent-api
template:
metadata:
labels:
app: agent-api
spec:
containers:
- name: agent-api
image: your-registry/agent-api:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: agent-api-service
spec:
selector:
app: agent-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
1. AWS Lambda with Mangum
# lambda_handler.py
from mangum import Mangum
from fastapi import FastAPI
from agents import Agent, Runner
app = FastAPI()
agent = Agent(...)
@app.post("/chat")
async def chat(message: str):
result = await Runner.run(agent, message)
return {"response": result.final_output}
# Lambda handler
handler = Mangum(app)
2. Vercel/Netlify Functions
# api/chat.py
from http.server import BaseHTTPRequestHandler
import json
from agents import Agent, Runner
import asyncio
agent = Agent(...)
class handler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data)
# Run agent
loop = asyncio.new_event_loop()
result = loop.run_until_complete(
Runner.run(agent, data['message'])
)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({
'response': result.final_output
}).encode())
1. Agent Registry Pattern
from fastapi import FastAPI
from typing import Dict
from agents import Agent, Runner
class AgentRegistry:
def __init__(self):
self._agents: Dict[str, Agent] = {}
def register(self, name: str, agent: Agent):
self._agents[name] = agent
def get(self, name: str) -> Agent | None:
return self._agents.get(name)
def list_agents(self) -> list[str]:
return list(self._agents.keys())
# Initialize registry
registry = AgentRegistry()
# Register your agents
registry.register("customer_support", customer_support_agent)
registry.register("sales", sales_agent)
registry.register("technical", technical_agent)
app = FastAPI()
@app.get("/agents")
async def list_agents():
return {"agents": registry.list_agents()}
@app.post("/chat/{agent_name}")
async def chat_with_agent(agent_name: str, message: str):
agent = registry.get(agent_name)
if not agent:
raise HTTPException(404, f"Agent {agent_name} not found")
result = await Runner.run(agent, message)
return {"response": result.final_output}
2. Load Balancing Multiple Agent Instances
import asyncio
from typing import List
from agents import Agent, Runner
class AgentPool:
def __init__(self, agent: Agent, pool_size: int = 5):
self.agent = agent
self.semaphore = asyncio.Semaphore(pool_size)
async def run(self, message: str):
async with self.semaphore:
return await Runner.run(self.agent, message)
# Create agent pool
agent_pool = AgentPool(agent, pool_size=10)
@app.post("/chat")
async def chat(message: str):
# Automatically manages concurrent requests
result = await agent_pool.run(message)
return {"response": result.final_output}
1. API Key Authentication
from fastapi import FastAPI, Header, HTTPException
from fastapi.security import APIKeyHeader
import redis
app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")
redis_client = redis.Redis()
async def verify_api_key(api_key: str = Header(...)):
# Check if API key exists in database/cache
if not redis_client.exists(f"api_key:{api_key}"):
raise HTTPException(401, "Invalid API key")
return api_key
@app.post("/chat")
async def chat(message: str, api_key: str = Depends(verify_api_key)):
# Track usage
redis_client.hincrby(f"usage:{api_key}", "requests", 1)
result = await Runner.run(agent, message)
return {"response": result.final_output}
2. Rate Limiting
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)
@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request: Request, message: str):
result = await Runner.run(agent, message)
return {"response": result.final_output}
1. Health Checks and Monitoring
from fastapi import FastAPI
from prometheus_client import Counter, Histogram, generate_latest
import time
# Metrics
request_count = Counter('agent_requests_total', 'Total requests')
request_duration = Histogram('agent_request_duration_seconds', 'Request duration')
@app.get("/health")
async def health_check():
# Check agent availability
try:
result = await Runner.run(agent, "test")
return {"status": "healthy", "agent": "responsive"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
@app.middleware("http")
async def track_metrics(request: Request, call_next):
if request.url.path == "/chat":
request_count.inc()
start = time.time()
response = await call_next(request)
if request.url.path == "/chat":
request_duration.observe(time.time() - start)
return response
2. Graceful Shutdown
import signal
import asyncio
shutdown_event = asyncio.Event()
def signal_handler(sig, frame):
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
@app.on_event("shutdown")
async def shutdown():
# Clean up resources
await session_cleanup()
await close_db_connections()
3. Session Persistence for Distributed Deployment
from agents.memory import Session
import redis
import json
class RedisSession(Session):
def __init__(self, session_id: str, redis_client: redis.Redis):
self.session_id = session_id
self.redis = redis_client
self.key = f"session:{session_id}"
async def get_items(self, limit: int | None = None) -> List[dict]:
items = self.redis.lrange(self.key, 0, -1)
items = [json.loads(item) for item in items]
if limit:
return items[-limit:]
return items
async def add_items(self, items: List[dict]) -> None:
for item in items:
self.redis.rpush(self.key, json.dumps(item))
# Set expiration (24 hours)
self.redis.expire(self.key, 86400)
# Use in API
@app.post("/chat")
async def chat(message: str, session_id: str):
session = RedisSession(session_id, redis_client)
result = await Runner.run(agent, message, session=session)
return {"response": result.final_output}
- Horizontal Scaling: Deploy multiple API instances behind a load balancer
- Caching: Cache common queries using Redis/Memcached
- Queue-based Processing: Use Celery/RQ for async processing of long-running tasks
- Database Optimization: Use connection pooling and read replicas
- CDN Integration: Serve static agent responses from edge locations
- Auto-scaling: Configure based on CPU/memory usage or request rate
- Set up environment variables (API keys, configuration)
- Configure logging and monitoring
- Implement health checks
- Set up rate limiting and authentication
- Configure CORS if needed
- Set up SSL/TLS certificates
- Configure database/session persistence
- Set up backup and disaster recovery
- Implement graceful shutdown
- Configure auto-scaling policies
- Set up CI/CD pipeline
- Document API endpoints
- Load test the deployment
- Set up error tracking (Sentry, etc.)
- Configure observability (traces, metrics, logs)
The SDK's stateless design and async-first architecture make it ideal for modern cloud deployments, whether you're serving a few users or scaling to millions.
- Stateless agent design enables horizontal scaling
- Async-first implementation for high concurrency
- Session backends can be distributed (Redis, etc.)
- Comprehensive error handling with custom exceptions
- Guardrails for input/output validation
- Configurable timeouts and retry mechanisms
- Rich tracing data with customizable metadata
- Usage tracking for cost management
- Event streaming for real-time monitoring
- Guardrail system for content filtering
- Configurable sensitive data exclusion from traces
- Tool permission management through context
- Manager agents coordinating specialist agents
- Dynamic agent selection based on task requirements
- Result aggregation and synthesis
- Complex multi-step workflows with conditional logic
- State machines implemented through handoffs
- Dynamic workflow adaptation based on results
- Shared model clients across agents
- Connection pooling for external services
- Efficient message passing without duplication
The SDK's design philosophy emphasizes:
- Composability: Small, focused agents that combine into complex systems
- Flexibility: Multiple patterns for different use cases
- Observability: Deep insights into agent behavior
- Production-readiness: Built for scale, monitoring, and reliability
This architecture enables building sophisticated multi-agent systems that can handle complex, real-world tasks while maintaining clarity and maintainability.