Skip to content

Instantly share code, notes, and snippets.

@tkersey
Created July 27, 2025 18:25
Show Gist options
  • Save tkersey/05f89b229a42610b1d378ee43b95de30 to your computer and use it in GitHub Desktop.
Save tkersey/05f89b229a42610b1d378ee43b95de30 to your computer and use it in GitHub Desktop.

Deep Analysis: OpenAI Agents Python SDK

Based on my comprehensive exploration of the codebase, here's how this SDK handles multi-agent systems:

1. Agent Composition Patterns

The SDK supports several powerful composition patterns:

a) Handoffs (Agent-to-Agent Delegation)

  • Agents can transfer control to other agents through the handoffs mechanism
  • Handoffs appear as tools to the LLM, allowing dynamic routing based on context
  • Example: A triage agent routing to language-specific agents (src/agents/handoffs.py:58-268)
  • Handoffs can include input filters to transform conversation history before passing control

b) Agents as Tools

  • Any agent can be exposed as a tool to other agents via agent.as_tool()
  • This enables hierarchical agent structures where specialized agents are invoked for specific tasks
  • Example: Financial research agent using fundamentals and risk analysis agents as tools (examples/financial_research_agent/manager.py:102-112)

c) Parallel Execution

  • Multiple agents can run concurrently using asyncio.gather()
  • Results can be aggregated by a coordinator agent
  • Example: Running multiple translation agents in parallel and selecting the best result (examples/agent_patterns/parallelization.py:26-39)

2. Communication Mechanisms

a) Message Passing

  • Agents communicate through structured message items (RunItem)
  • Messages preserve full conversation history including tool calls and responses
  • Each agent receives the complete context when invoked

b) Context Objects

  • Shared mutable context via RunContextWrapper[TContext] (src/agents/run_context.py:11-27)
  • Context is passed to all tools, guardrails, and hooks
  • Enables dependency injection and state sharing across agent components

c) Streaming Events

  • Real-time communication through event streams
  • Events include agent starts/ends, handoffs, tool calls, and custom events
  • Enables reactive UI updates and monitoring

3. Execution Patterns

a) Sequential Execution (Default)

  • Agent loop runs until final output is produced (src/agents/run.py:372-498)
  • Turn-based execution with configurable max_turns limit
  • Each turn: LLM call → Tool execution → Handoff/Continue/Finish

b) Concurrent Tool Execution

  • Tools within a single agent run concurrently by default
  • Implemented using asyncio.gather() for parallel tool calls
  • Results are collected and passed back to the LLM

c) Streaming Execution

  • Non-blocking execution with event streaming (src/agents/run.py:526-594)
  • Allows real-time UI updates while agents process
  • Maintains same execution semantics as non-streaming mode

4. State Management

a) Session Memory

  • Built-in conversation persistence via Session interface
  • SQLite implementation provided, custom backends supported
  • Automatic history management across agent runs

b) Agent State

  • Agents are stateless by design - state lives in context and messages
  • Each agent run creates new execution context
  • State persistence handled through sessions and context objects

c) Tool Use Tracking

  • Tracks which tools agents have used to prevent infinite loops
  • Configurable tool choice reset behavior after tool calls

5. Context Management Deep Dive

The SDK provides comprehensive context management capabilities for complex multi-agent workflows:

a) Core Context Architecture

RunContextWrapper[TContext]

@dataclass
class RunContextWrapper(Generic[TContext]):
    context: TContext  # Your custom context object
    usage: Usage       # Tracks LLM usage across the run
  • Generic type parameter TContext ensures type safety
  • Context is mutable and passed by reference
  • All components receive the same context instance

b) Context Access Points

1. Tool Functions

@function_tool
async def my_tool(context: RunContextWrapper[MyContext], param: str) -> str:
    # Access custom context
    db = context.context.database_connection
    user = context.context.current_user
    
    # Modify context
    context.context.processed_items.append(param)
    
    return await db.query(param)

2. Guardrails

class MyGuardrail(InputGuardrail[MyContext]):
    async def check(self, agent: Agent, input: str | list, 
                   context: RunContextWrapper[MyContext]) -> GuardrailOutput:
        if context.context.user_role != "admin":
            return GuardrailOutput(should_block=True)
        return GuardrailOutput(should_block=False)

3. Lifecycle Hooks

class MyHooks(AgentHooks[MyContext]):
    async def on_start(self, context: RunContextWrapper[MyContext], agent: Agent):
        # Initialize agent-specific resources
        context.context.agent_resources[agent.name] = ResourcePool()

4. Handoff Filters

def context_transformer(handoff_data: HandoffInputData) -> HandoffInputData:
    # Transform context between agents
    # Can modify input history, items, etc.
    return handoff_data

handoff = handoff(
    target_agent,
    input_filter=context_transformer
)

c) Context Persistence Patterns

1. External State Store Integration

@dataclass
class PersistentContext:
    redis_client: Redis
    postgres_conn: asyncpg.Connection
    state_cache: dict[str, Any]
    
    async def persist(self):
        # Save to external stores
        await self.redis_client.set("context_state", json.dumps(self.state_cache))
        await self.postgres_conn.execute("INSERT INTO context_history ...")
    
    @classmethod
    async def restore(cls, session_id: str) -> PersistentContext:
        # Restore from external stores
        redis = await aioredis.create_redis_pool(...)
        postgres = await asyncpg.connect(...)
        state = await redis.get(f"context_{session_id}")
        return cls(redis, postgres, json.loads(state))

2. Context Checkpointing

class CheckpointingHooks(RunHooks[PersistentContext]):
    async def on_agent_end(self, context: RunContextWrapper[PersistentContext], 
                          agent: Agent, result: Any):
        # Checkpoint after each agent completes
        await context.context.persist()

d) Dynamic Context Modification

1. Context Enrichment Pattern

# Each agent enriches context with its results
research_agent = Agent(
    name="Researcher",
    tools=[
        @function_tool
        async def save_research(context: RunContextWrapper, data: str):
            context.context.research_data.append(data)
            return "Saved"
    ]
)

analysis_agent = Agent(
    name="Analyst",
    tools=[
        @function_tool
        async def analyze(context: RunContextWrapper):
            # Access enriched context from previous agent
            research = context.context.research_data
            analysis = perform_analysis(research)
            context.context.analysis_results = analysis
            return analysis
    ]
)

2. Context Isolation Between Agents

class IsolatedContext:
    def __init__(self):
        self.global_state = {}  # Shared across all agents
        self.agent_state = {}   # Isolated per agent
    
    def get_agent_context(self, agent_name: str):
        if agent_name not in self.agent_state:
            self.agent_state[agent_name] = {}
        return self.agent_state[agent_name]

# In handoff filter
def isolate_context(handoff_data: HandoffInputData) -> HandoffInputData:
    # Clear sensitive data before handoff
    context = handoff_data.context
    context.context.agent_state.clear()
    return handoff_data

e) External Resource Management

1. Connection Pooling

@dataclass
class ResourceContext:
    _db_pool: asyncpg.Pool
    _http_session: aiohttp.ClientSession
    _api_clients: dict[str, Any]
    
    async def get_db_connection(self) -> asyncpg.Connection:
        return await self._db_pool.acquire()
    
    def get_api_client(self, service: str) -> Any:
        if service not in self._api_clients:
            self._api_clients[service] = create_client(service)
        return self._api_clients[service]
    
    async def cleanup(self):
        await self._db_pool.close()
        await self._http_session.close()

2. Resource Lifecycle Management

class ResourceManager:
    @classmethod
    @asynccontextmanager
    async def create_context(cls) -> ResourceContext:
        # Initialize resources
        db_pool = await asyncpg.create_pool(...)
        http_session = aiohttp.ClientSession()
        
        context = ResourceContext(db_pool, http_session, {})
        try:
            yield context
        finally:
            # Cleanup resources
            await context.cleanup()

# Usage
async with ResourceManager.create_context() as resource_ctx:
    result = await Runner.run(
        agent,
        input="Process data",
        context=resource_ctx
    )

f) Concurrent Context Access

1. Thread-Safe Context Updates

import asyncio
from threading import Lock

@dataclass
class ConcurrentContext:
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    _data: dict = field(default_factory=dict)
    
    async def update(self, key: str, value: Any):
        async with self._lock:
            self._data[key] = value
    
    async def get(self, key: str) -> Any:
        async with self._lock:
            return self._data.get(key)

2. Context Merging for Parallel Agents

async def run_parallel_agents(base_context: MyContext):
    # Create context copies for parallel execution
    contexts = [
        RunContextWrapper(context=copy.deepcopy(base_context))
        for _ in range(3)
    ]
    
    # Run agents in parallel
    results = await asyncio.gather(
        Runner.run(agent1, "Task 1", context=contexts[0].context),
        Runner.run(agent2, "Task 2", context=contexts[1].context),
        Runner.run(agent3, "Task 3", context=contexts[2].context),
    )
    
    # Merge contexts back
    for ctx in contexts:
        base_context.merge_results(ctx.context)
    
    return results

g) Context Transformation Between Agents

1. Schema Evolution

class ContextAdapter:
    @staticmethod
    def v1_to_v2(old_context: ContextV1) -> ContextV2:
        return ContextV2(
            user_id=old_context.user_id,
            preferences=old_context.get_preferences(),
            # New fields with defaults
            feature_flags={},
            api_version="v2"
        )

# In handoff
handoff = handoff(
    new_agent,
    input_filter=lambda data: ContextAdapter.v1_to_v2(data.context)
)

2. Domain-Specific Context Views

class MultiDomainContext:
    def __init__(self):
        self.financial_data = FinancialContext()
        self.user_data = UserContext()
        self.system_data = SystemContext()
    
    def get_view(self, domain: str):
        """Return domain-specific view of context"""
        views = {
            "financial": self.financial_data,
            "user": self.user_data,
            "system": self.system_data
        }
        return views.get(domain)

# Agents access only their domain
financial_agent = Agent(
    name="FinancialAnalyst",
    tools=[
        @function_tool
        async def analyze(context: RunContextWrapper[MultiDomainContext]):
            fin_ctx = context.context.get_view("financial")
            # Only sees financial data
            return fin_ctx.calculate_metrics()
    ]
)

h) Best Practices for Context Management

  1. Keep Context Serializable: Use dataclasses or Pydantic models for easy persistence
  2. Minimize Context Size: Store references/IDs rather than full objects when possible
  3. Use Type Hints: Leverage Generic[TContext] for compile-time safety
  4. Handle Cleanup: Implement proper resource cleanup in hooks or context managers
  5. Version Your Context: Plan for schema evolution from the start
  6. Isolate Sensitive Data: Use handoff filters to prevent data leakage between agents
  7. Monitor Context Growth: Track context size to prevent memory issues
  8. Use Async Locks: Protect shared state in concurrent scenarios

6. Deployment and Hosting Guide

The SDK is designed to be deployed in various ways, from simple API servers to complex distributed systems. Here's how to share your agents with the world:

a) Basic API Server Deployment

1. FastAPI Integration (Recommended)

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agents import Agent, Runner

# Define your agent
agent = Agent(
    name="MyAssistant",
    instructions="You are a helpful assistant.",
    tools=[...],  # Your tools
)

app = FastAPI()

class ChatRequest(BaseModel):
    message: str
    session_id: str | None = None

class ChatResponse(BaseModel):
    response: str
    session_id: str

@app.post("/chat")
async def chat(request: ChatRequest) -> ChatResponse:
    try:
        result = await Runner.run(agent, request.message)
        return ChatResponse(
            response=result.final_output,
            session_id=request.session_id or "default"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Run with: uvicorn main:app --host 0.0.0.0 --port 8000

2. Streaming API Endpoint

from fastapi import FastAPI
from starlette.responses import StreamingResponse
from agents import Agent, Runner, RawResponsesStreamEvent
import json

app = FastAPI()

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    result = Runner.run_streamed(agent, request.message)
    
    async def event_generator():
        async for event in result.stream_events():
            if isinstance(event, RawResponsesStreamEvent):
                yield f"data: {json.dumps({'type': event.type, 'data': str(event.data)})}\n\n"
        yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

3. WebSocket Real-time Communication

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()
active_sessions = {}

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # Run agent
            result = Runner.run_streamed(agent, message["text"])
            
            # Stream responses
            async for event in result.stream_events():
                await websocket.send_json({
                    "type": "stream",
                    "data": str(event)
                })
            
            # Send final result
            await websocket.send_json({
                "type": "final",
                "data": result.final_output
            })
    except WebSocketDisconnect:
        pass

b) Production Deployment Patterns

1. Containerized Deployment (Docker)

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
  agent-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./data:/app/data  # For SQLite sessions
    restart: unless-stopped

2. Kubernetes Deployment

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent-api
  template:
    metadata:
      labels:
        app: agent-api
    spec:
      containers:
      - name: agent-api
        image: your-registry/agent-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: agent-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
  name: agent-api-service
spec:
  selector:
    app: agent-api
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer

c) Serverless Deployment

1. AWS Lambda with Mangum

# lambda_handler.py
from mangum import Mangum
from fastapi import FastAPI
from agents import Agent, Runner

app = FastAPI()
agent = Agent(...)

@app.post("/chat")
async def chat(message: str):
    result = await Runner.run(agent, message)
    return {"response": result.final_output}

# Lambda handler
handler = Mangum(app)

2. Vercel/Netlify Functions

# api/chat.py
from http.server import BaseHTTPRequestHandler
import json
from agents import Agent, Runner
import asyncio

agent = Agent(...)

class handler(BaseHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        data = json.loads(post_data)
        
        # Run agent
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(
            Runner.run(agent, data['message'])
        )
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({
            'response': result.final_output
        }).encode())

d) Multi-Agent API Architecture

1. Agent Registry Pattern

from fastapi import FastAPI
from typing import Dict
from agents import Agent, Runner

class AgentRegistry:
    def __init__(self):
        self._agents: Dict[str, Agent] = {}
    
    def register(self, name: str, agent: Agent):
        self._agents[name] = agent
    
    def get(self, name: str) -> Agent | None:
        return self._agents.get(name)
    
    def list_agents(self) -> list[str]:
        return list(self._agents.keys())

# Initialize registry
registry = AgentRegistry()

# Register your agents
registry.register("customer_support", customer_support_agent)
registry.register("sales", sales_agent)
registry.register("technical", technical_agent)

app = FastAPI()

@app.get("/agents")
async def list_agents():
    return {"agents": registry.list_agents()}

@app.post("/chat/{agent_name}")
async def chat_with_agent(agent_name: str, message: str):
    agent = registry.get(agent_name)
    if not agent:
        raise HTTPException(404, f"Agent {agent_name} not found")
    
    result = await Runner.run(agent, message)
    return {"response": result.final_output}

2. Load Balancing Multiple Agent Instances

import asyncio
from typing import List
from agents import Agent, Runner

class AgentPool:
    def __init__(self, agent: Agent, pool_size: int = 5):
        self.agent = agent
        self.semaphore = asyncio.Semaphore(pool_size)
    
    async def run(self, message: str):
        async with self.semaphore:
            return await Runner.run(self.agent, message)

# Create agent pool
agent_pool = AgentPool(agent, pool_size=10)

@app.post("/chat")
async def chat(message: str):
    # Automatically manages concurrent requests
    result = await agent_pool.run(message)
    return {"response": result.final_output}

e) Authentication and Rate Limiting

1. API Key Authentication

from fastapi import FastAPI, Header, HTTPException
from fastapi.security import APIKeyHeader
import redis

app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")
redis_client = redis.Redis()

async def verify_api_key(api_key: str = Header(...)):
    # Check if API key exists in database/cache
    if not redis_client.exists(f"api_key:{api_key}"):
        raise HTTPException(401, "Invalid API key")
    return api_key

@app.post("/chat")
async def chat(message: str, api_key: str = Depends(verify_api_key)):
    # Track usage
    redis_client.hincrby(f"usage:{api_key}", "requests", 1)
    
    result = await Runner.run(agent, message)
    return {"response": result.final_output}

2. Rate Limiting

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)

@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request: Request, message: str):
    result = await Runner.run(agent, message)
    return {"response": result.final_output}

f) Production Best Practices

1. Health Checks and Monitoring

from fastapi import FastAPI
from prometheus_client import Counter, Histogram, generate_latest
import time

# Metrics
request_count = Counter('agent_requests_total', 'Total requests')
request_duration = Histogram('agent_request_duration_seconds', 'Request duration')

@app.get("/health")
async def health_check():
    # Check agent availability
    try:
        result = await Runner.run(agent, "test")
        return {"status": "healthy", "agent": "responsive"}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")

@app.middleware("http")
async def track_metrics(request: Request, call_next):
    if request.url.path == "/chat":
        request_count.inc()
        start = time.time()
    
    response = await call_next(request)
    
    if request.url.path == "/chat":
        request_duration.observe(time.time() - start)
    
    return response

2. Graceful Shutdown

import signal
import asyncio

shutdown_event = asyncio.Event()

def signal_handler(sig, frame):
    shutdown_event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

@app.on_event("shutdown")
async def shutdown():
    # Clean up resources
    await session_cleanup()
    await close_db_connections()

3. Session Persistence for Distributed Deployment

from agents.memory import Session
import redis
import json

class RedisSession(Session):
    def __init__(self, session_id: str, redis_client: redis.Redis):
        self.session_id = session_id
        self.redis = redis_client
        self.key = f"session:{session_id}"
    
    async def get_items(self, limit: int | None = None) -> List[dict]:
        items = self.redis.lrange(self.key, 0, -1)
        items = [json.loads(item) for item in items]
        if limit:
            return items[-limit:]
        return items
    
    async def add_items(self, items: List[dict]) -> None:
        for item in items:
            self.redis.rpush(self.key, json.dumps(item))
        # Set expiration (24 hours)
        self.redis.expire(self.key, 86400)

# Use in API
@app.post("/chat")
async def chat(message: str, session_id: str):
    session = RedisSession(session_id, redis_client)
    result = await Runner.run(agent, message, session=session)
    return {"response": result.final_output}

g) Scaling Strategies

  1. Horizontal Scaling: Deploy multiple API instances behind a load balancer
  2. Caching: Cache common queries using Redis/Memcached
  3. Queue-based Processing: Use Celery/RQ for async processing of long-running tasks
  4. Database Optimization: Use connection pooling and read replicas
  5. CDN Integration: Serve static agent responses from edge locations
  6. Auto-scaling: Configure based on CPU/memory usage or request rate

h) Deployment Checklist

  • Set up environment variables (API keys, configuration)
  • Configure logging and monitoring
  • Implement health checks
  • Set up rate limiting and authentication
  • Configure CORS if needed
  • Set up SSL/TLS certificates
  • Configure database/session persistence
  • Set up backup and disaster recovery
  • Implement graceful shutdown
  • Configure auto-scaling policies
  • Set up CI/CD pipeline
  • Document API endpoints
  • Load test the deployment
  • Set up error tracking (Sentry, etc.)
  • Configure observability (traces, metrics, logs)

The SDK's stateless design and async-first architecture make it ideal for modern cloud deployments, whether you're serving a few users or scaling to millions.

7. Production Considerations

a) Scalability

  • Stateless agent design enables horizontal scaling
  • Async-first implementation for high concurrency
  • Session backends can be distributed (Redis, etc.)

b) Reliability

  • Comprehensive error handling with custom exceptions
  • Guardrails for input/output validation
  • Configurable timeouts and retry mechanisms

c) Monitoring

  • Rich tracing data with customizable metadata
  • Usage tracking for cost management
  • Event streaming for real-time monitoring

d) Security

  • Guardrail system for content filtering
  • Configurable sensitive data exclusion from traces
  • Tool permission management through context

8. Advanced Patterns

a) Hierarchical Agent Systems

  • Manager agents coordinating specialist agents
  • Dynamic agent selection based on task requirements
  • Result aggregation and synthesis

b) Workflow Orchestration

  • Complex multi-step workflows with conditional logic
  • State machines implemented through handoffs
  • Dynamic workflow adaptation based on results

c) Resource Management

  • Shared model clients across agents
  • Connection pooling for external services
  • Efficient message passing without duplication

Design Philosophy

The SDK's design philosophy emphasizes:

  • Composability: Small, focused agents that combine into complex systems
  • Flexibility: Multiple patterns for different use cases
  • Observability: Deep insights into agent behavior
  • Production-readiness: Built for scale, monitoring, and reliability

This architecture enables building sophisticated multi-agent systems that can handle complex, real-world tasks while maintaining clarity and maintainability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment