Based on the research conducted using Octocode MCP, here's a real-world example of Deep Agents in action:
Repository: guy-hartstein/company-research-agent (1,445+ stars)
This is a production-ready implementation of Deep Agents that demonstrates all four core components working together:
βββββββββββββββββββ    βββββββββββββββββββ    βββββββββββββββββββ
β   Planning Tool β    β   Sub Agents    β    β  File System    β
β                 β    β                 β    β                 β
β β’ Task Pipeline ββββββ β’ CompanyAnalyzerββββββ β’ Document Storeβ
β β’ Node Routing  β    β β’ IndustryAnalyzerβ   β β’ Progress Cacheβ
β β’ Progress Trackβ    β β’ FinancialAnalystβ   β β’ Result Archiveβ
β β’ Error Recoveryβ    β β’ NewsScanner   β    β β’ Search Index  β
βββββββββββββββββββ    βββββββββββββββββββ    βββββββββββββββββββ
                                β
                       βββββββββββββββββββ
                       β  System Prompt  β
                       β                 β
                       β β’ Research Goalsβ
                       β β’ Quality Standardsβ
                       β β’ Coordination Rulesβ
                       β β’ Output Formats β
                       βββββββββββββββββββ
The system uses a sequential pipeline architecture with specialized nodes:
# Research Pipeline (Planning Tool)
class ResearchPipeline:
    def __init__(self):
        self.nodes = [
            CompanyAnalyzer(),      # Research core business info
            IndustryAnalyzer(),     # Analyze market position  
            FinancialAnalyst(),     # Gather financial data
            NewsScanner(),          # Collect recent news
            Collector(),            # Aggregate all research
            Curator(),              # Filter and score content
            Briefing(),             # Generate summaries
            Editor()                # Compile final report
        ]
    
    async def execute(self, company_name: str):
        context = {"company": company_name, "data": {}}
        
        for node in self.nodes:
            context = await node.process(context)
            await self.send_progress_update(node.name, context)
        
        return context["final_report"]Each agent has domain-specific expertise and tools:
class CompanyAnalyzer(BaseAgent):
    """Researches core business information"""
    
    async def process(self, context):
        company = context["company"]
        
        # Generate targeted search queries
        queries = await self.generate_queries(company)
        
        # Use Tavily API for web research
        results = await self.tavily_client.search(queries)
        
        # Extract and structure business data
        business_info = await self.extract_business_info(results)
        
        context["data"]["business"] = business_info
        return context
class FinancialAnalyst(BaseAgent):
    """Gathers financial metrics and performance data"""
    
    async def process(self, context):
        company = context["company"]
        
        # Search for financial reports and metrics
        financial_queries = [
            f"{company} financial statements",
            f"{company} revenue earnings",
            f"{company} stock performance"
        ]
        
        results = await self.tavily_client.search(financial_queries)
        financial_data = await self.extract_financial_metrics(results)
        
        context["data"]["financial"] = financial_data
        return contextSophisticated content curation and storage system:
class ContentCurator:
    """Implements content filtering and relevance scoring"""
    
    async def process(self, context):
        all_documents = context["data"]["raw_documents"]
        
        # Relevance scoring using Tavily's AI
        scored_docs = []
        for doc in all_documents:
            score = await self.tavily_client.get_relevance_score(doc)
            if score >= self.min_threshold:  # Default: 0.4
                scored_docs.append({
                    "content": doc,
                    "relevance_score": score,
                    "url": doc.url,
                    "timestamp": doc.timestamp
                })
        
        # Sort by relevance and deduplicate
        curated_docs = self.deduplicate_and_sort(scored_docs)
        
        # Store in persistent file system
        await self.file_system.store_curated_content(curated_docs)
        
        context["data"]["curated"] = curated_docs
        return context
class VirtualFileSystem:
    """Manages persistent storage and retrieval"""
    
    def __init__(self):
        self.storage = {}  # In production: MongoDB/PostgreSQL
        self.search_index = SearchIndex()
    
    async def store_curated_content(self, documents):
        for doc in documents:
            doc_id = self.generate_doc_id(doc)
            self.storage[doc_id] = doc
            await self.search_index.index_document(doc_id, doc)
    
    async def retrieve_by_query(self, query: str):
        return await self.search_index.search(query)Multi-model architecture with specialized prompts:
class SystemPromptManager:
    """Manages different prompts for different models and tasks"""
    
    RESEARCH_PROMPT = """
    You are a specialized research agent in a multi-agent system.
    Your role: {agent_role}
    
    Guidelines:
    1. Focus on factual, verifiable information
    2. Cite sources and provide relevance scores
    3. Structure output for downstream processing
    4. Communicate progress to the coordination system
    
    Current Task: {task_description}
    Context: {context_data}
    """
    
    SYNTHESIS_PROMPT = """
    You are using Gemini 2.0 Flash for high-context research synthesis.
    
    Task: Generate comprehensive briefing from research data
    Strengths: Large context windows, data summarization
    
    Input: {research_data}
    Requirements: {output_requirements}
    """
    
    EDITING_PROMPT = """
    You are using GPT-4.1-mini for precise formatting and editing.
    
    Task: Compile and format final report
    Strengths: Precise formatting, consistency, markdown structure
    
    Content: {briefing_content}
    Format Requirements: {format_specs}
    """The system implements WebSocket-based real-time updates:
class WebSocketManager:
    """Manages real-time communication with frontend"""
    
    def __init__(self):
        self.active_connections = {}
    
    async def send_status_update(self, job_id: str, status: str, message: str, result: dict):
        """Send structured updates to connected clients"""
        update = {
            "job_id": job_id,
            "status": status,
            "message": message,
            "result": result,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        if job_id in self.active_connections:
            await self.active_connections[job_id].send_json(update)
# Usage in agents
async def process_with_progress(self, context):
    await websocket_manager.send_status_update(
        job_id=context["job_id"],
        status="processing",
        message=f"Analyzing {context['company']} financial data",
        result={
            "step": "FinancialAnalyst",
            "progress": "50%",
            "documents_processed": 15
        }
    )Strategic use of different models for optimal performance:
class ModelManager:
    """Manages different models for different tasks"""
    
    def __init__(self):
        self.gemini = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
        self.gpt4 = ChatOpenAI(model="gpt-4.1-mini")
    
    async def synthesize_research(self, research_data: dict) -> str:
        """Use Gemini for high-context synthesis"""
        prompt = self.build_synthesis_prompt(research_data)
        return await self.gemini.ainvoke(prompt)
    
    async def format_report(self, content: str) -> str:
        """Use GPT-4 for precise formatting"""
        prompt = self.build_formatting_prompt(content)
        return await self.gpt4.ainvoke(prompt)For complex workflow management:
from langgraph.graph import StateGraph, END
def create_research_graph():
    """Create LangGraph workflow for research pipeline"""
    
    workflow = StateGraph(ResearchState)
    
    # Add nodes for each agent
    workflow.add_node("company_analyzer", CompanyAnalyzer())
    workflow.add_node("industry_analyzer", IndustryAnalyzer())
    workflow.add_node("financial_analyst", FinancialAnalyst())
    workflow.add_node("news_scanner", NewsScanner())
    workflow.add_node("curator", ContentCurator())
    workflow.add_node("briefing", BriefingGenerator())
    workflow.add_node("editor", ReportEditor())
    
    # Define workflow edges
    workflow.add_edge("company_analyzer", "industry_analyzer")
    workflow.add_edge("industry_analyzer", "financial_analyst")
    workflow.add_edge("financial_analyst", "news_scanner")
    workflow.add_edge("news_scanner", "curator")
    workflow.add_edge("curator", "briefing")
    workflow.add_edge("briefing", "editor")
    workflow.add_edge("editor", END)
    
    # Set entry point
    workflow.set_entry_point("company_analyzer")
    
    return workflow.compile()For dynamic tool discovery:
from deepmcpagent import HTTPServerSpec, build_deep_agent
async def create_mcp_enhanced_agent():
    """Create agent with MCP tool discovery"""
    
    servers = {
        "research_tools": HTTPServerSpec(
            url="http://localhost:8000/mcp",
            transport="http"
        ),
        "financial_data": HTTPServerSpec(
            url="http://financial-api.com/mcp",
            transport="sse"
        )
    }
    
    model = ChatOpenAI(model="gpt-4.1")
    
    graph, loader = await build_deep_agent(
        servers=servers,
        model=model,
        instructions="You are a research agent with access to dynamic tools",
        trace_tools=True
    )
    
    return graph, loaderfrom fastapi import FastAPI, WebSocket, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="Deep Agents Research System")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
@app.post("/research/start")
async def start_research(
    request: ResearchRequest,
    background_tasks: BackgroundTasks
):
    """Start a new research job"""
    job_id = generate_job_id()
    
    # Start research in background
    background_tasks.add_task(
        execute_research_pipeline,
        job_id,
        request.company_name,
        request.research_depth
    )
    
    return {"job_id": job_id, "status": "started"}
@app.websocket("/research/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
    """WebSocket endpoint for real-time updates"""
    await websocket.accept()
    websocket_manager.add_connection(job_id, websocket)
    
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        websocket_manager.remove_connection(job_id)Execute independent agents concurrently:
import asyncio
async def parallel_research_phase(context):
    """Run independent research agents in parallel"""
    
    tasks = [
        CompanyAnalyzer().process(context.copy()),
        IndustryAnalyzer().process(context.copy()),
        FinancialAnalyst().process(context.copy()),
        NewsScanner().process(context.copy())
    ]
    
    results = await asyncio.gather(*tasks)
    
    # Merge results
    merged_context = context.copy()
    for result in results:
        merged_context["data"].update(result["data"])
    
    return merged_contextCache expensive operations:
from functools import lru_cache
import redis
class CacheManager:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    async def get_cached_research(self, company: str, research_type: str):
        """Get cached research results"""
        cache_key = f"research:{company}:{research_type}"
        cached = self.redis_client.get(cache_key)
        
        if cached:
            return json.loads(cached)
        return None
    
    async def cache_research(self, company: str, research_type: str, data: dict):
        """Cache research results with TTL"""
        cache_key = f"research:{company}:{research_type}"
        self.redis_client.setex(
            cache_key,
            timedelta(hours=24),  # 24-hour TTL
            json.dumps(data)
        )Manage API rate limits and costs:
class ResourceManager:
    def __init__(self):
        self.api_quotas = {
            "tavily": RateLimiter(max_calls=100, time_window=3600),
            "openai": RateLimiter(max_calls=1000, time_window=3600),
            "gemini": RateLimiter(max_calls=500, time_window=3600)
        }
    
    async def execute_with_quota(self, api_name: str, func, *args, **kwargs):
        """Execute function with rate limiting"""
        limiter = self.api_quotas[api_name]
        
        async with limiter:
            return await func(*args, **kwargs)from pydantic import BaseModel, validator
import re
class ResearchRequest(BaseModel):
    company_name: str
    research_depth: str = "standard"
    
    @validator('company_name')
    def validate_company_name(cls, v):
        # Sanitize company name input
        if not re.match(r'^[a-zA-Z0-9\s\-\.&]+$', v):
            raise ValueError('Invalid company name format')
        if len(v) > 100:
            raise ValueError('Company name too long')
        return v.strip()class ResilientAgent:
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
    
    async def execute_with_retry(self, func, *args, **kwargs):
        """Execute function with exponential backoff retry"""
        for attempt in range(self.max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise e
                
                wait_time = 2 ** attempt  # Exponential backoff
                await asyncio.sleep(wait_time)
                
                logger.warning(f"Attempt {attempt + 1} failed: {e}")import structlog
from prometheus_client import Counter, Histogram, start_http_server
# Metrics
AGENT_EXECUTIONS = Counter('agent_executions_total', 'Total agent executions', ['agent_type', 'status'])
EXECUTION_TIME = Histogram('agent_execution_seconds', 'Agent execution time', ['agent_type'])
logger = structlog.get_logger()
class MonitoredAgent:
    def __init__(self, agent_type: str):
        self.agent_type = agent_type
    
    async def execute(self, task):
        with EXECUTION_TIME.labels(agent_type=self.agent_type).time():
            try:
                result = await self.process_task(task)
                AGENT_EXECUTIONS.labels(agent_type=self.agent_type, status='success').inc()
                
                logger.info(
                    "Agent execution completed",
                    agent_type=self.agent_type,
                    task_id=task.id,
                    duration=time.time() - start_time
                )
                
                return result
            except Exception as e:
                AGENT_EXECUTIONS.labels(agent_type=self.agent_type, status='error').inc()
                
                logger.error(
                    "Agent execution failed",
                    agent_type=self.agent_type,
                    task_id=task.id,
                    error=str(e)
                )
                raise# Dockerfile for Deep Agents
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1
# Start application
CMD ["uvicorn", "application:app", "--host", "0.0.0.0", "--port", "8000"]# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: deep-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: deep-agents
  template:
    metadata:
      labels:
        app: deep-agents
    spec:
      containers:
      - name: deep-agents
        image: deep-agents:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-key
        - name: TAVILY_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: tavily-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: deep-agents-service
spec:
  selector:
    app: deep-agents
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer- Stateless Agents: Design agents to be stateless for easy horizontal scaling
 - Load Balancing: Distribute requests across multiple agent instances
 - Queue Management: Use message queues for task distribution
 
- Memory Optimization: Efficient memory usage for large context processing
 - CPU Optimization: Parallel processing for compute-intensive tasks
 - GPU Acceleration: For model inference and heavy computations
 
- Read Replicas: Scale read operations across multiple database instances
 - Sharding: Partition data across multiple databases
 - Caching: Redis/Memcached for frequently accessed data
 
import pytest
from unittest.mock import AsyncMock, patch
class TestCompanyAnalyzer:
    @pytest.fixture
    def analyzer(self):
        return CompanyAnalyzer()
    
    @pytest.mark.asyncio
    async def test_process_success(self, analyzer):
        # Mock external dependencies
        with patch.object(analyzer.tavily_client, 'search') as mock_search:
            mock_search.return_value = [{"title": "Test Company", "content": "Test content"}]
            
            context = {"company": "Test Corp", "data": {}}
            result = await analyzer.process(context)
            
            assert "business" in result["data"]
            assert result["data"]["business"] is not None
    
    @pytest.mark.asyncio
    async def test_process_api_failure(self, analyzer):
        # Test error handling
        with patch.object(analyzer.tavily_client, 'search') as mock_search:
            mock_search.side_effect = Exception("API Error")
            
            context = {"company": "Test Corp", "data": {}}
            
            with pytest.raises(Exception):
                await analyzer.process(context)@pytest.mark.integration
class TestResearchPipeline:
    @pytest.mark.asyncio
    async def test_full_pipeline(self):
        pipeline = ResearchPipeline()
        
        # Test with real APIs (using test keys)
        result = await pipeline.execute("Apple Inc")
        
        assert result is not None
        assert "business" in result
        assert "financial" in result
        assert "industry" in result
        assert "news" in resultimport asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
async def load_test_research_endpoint():
    """Load test the research endpoint"""
    
    async def make_request(session, company):
        async with session.post(
            "http://localhost:8000/research/start",
            json={"company_name": company}
        ) as response:
            return await response.json()
    
    async with aiohttp.ClientSession() as session:
        companies = ["Apple", "Google", "Microsoft", "Amazon", "Tesla"] * 20
        
        tasks = [make_request(session, company) for company in companies]
        results = await asyncio.gather(*tasks)
        
        success_count = sum(1 for r in results if r.get("status") == "started")
        print(f"Success rate: {success_count}/{len(results)}")- DeepMCPAgent (
cryxnet/DeepMCPAgent): MCP-based agent framework - Company Research Agent (
guy-hartstein/company-research-agent): Production multi-agent system - LangGraph Examples (
langchain-ai/langgraph): Official workflow examples 
This context document provides the technical foundation for implementing sophisticated Deep Agent systems. Use it alongside the Planning Document for comprehensive system development.