Skip to content

Instantly share code, notes, and snippets.

@bgauryy
Created September 24, 2025 20:24
Show Gist options
  • Save bgauryy/e6c6b3b633599477ff78ad56da19a3e9 to your computer and use it in GitHub Desktop.
Save bgauryy/e6c6b3b633599477ff78ad56da19a3e9 to your computer and use it in GitHub Desktop.

Deep Agents Context Document

Technical Architecture and Implementation Guide


πŸ” Concrete Example: Company Research Agent

Based on the research conducted using Octocode MCP, here's a real-world example of Deep Agents in action:

Guy Hartstein's Company Research Agent

Repository: guy-hartstein/company-research-agent (1,445+ stars)

This is a production-ready implementation of Deep Agents that demonstrates all four core components working together:

Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Planning Tool β”‚    β”‚   Sub Agents    β”‚    β”‚  File System    β”‚
β”‚                 β”‚    β”‚                 β”‚    β”‚                 β”‚
β”‚ β€’ Task Pipeline │────│ β€’ CompanyAnalyzer│────│ β€’ Document Storeβ”‚
β”‚ β€’ Node Routing  β”‚    β”‚ β€’ IndustryAnalyzerβ”‚   β”‚ β€’ Progress Cacheβ”‚
β”‚ β€’ Progress Trackβ”‚    β”‚ β€’ FinancialAnalystβ”‚   β”‚ β€’ Result Archiveβ”‚
β”‚ β€’ Error Recoveryβ”‚    β”‚ β€’ NewsScanner   β”‚    β”‚ β€’ Search Index  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                β”‚
                       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                       β”‚  System Prompt  β”‚
                       β”‚                 β”‚
                       β”‚ β€’ Research Goalsβ”‚
                       β”‚ β€’ Quality Standardsβ”‚
                       β”‚ β€’ Coordination Rulesβ”‚
                       β”‚ β€’ Output Formats β”‚
                       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1. Planning Tool Implementation

The system uses a sequential pipeline architecture with specialized nodes:

# Research Pipeline (Planning Tool)
class ResearchPipeline:
    def __init__(self):
        self.nodes = [
            CompanyAnalyzer(),      # Research core business info
            IndustryAnalyzer(),     # Analyze market position  
            FinancialAnalyst(),     # Gather financial data
            NewsScanner(),          # Collect recent news
            Collector(),            # Aggregate all research
            Curator(),              # Filter and score content
            Briefing(),             # Generate summaries
            Editor()                # Compile final report
        ]
    
    async def execute(self, company_name: str):
        context = {"company": company_name, "data": {}}
        
        for node in self.nodes:
            context = await node.process(context)
            await self.send_progress_update(node.name, context)
        
        return context["final_report"]

2. Sub Agents (Specialized Analyzers)

Each agent has domain-specific expertise and tools:

class CompanyAnalyzer(BaseAgent):
    """Researches core business information"""
    
    async def process(self, context):
        company = context["company"]
        
        # Generate targeted search queries
        queries = await self.generate_queries(company)
        
        # Use Tavily API for web research
        results = await self.tavily_client.search(queries)
        
        # Extract and structure business data
        business_info = await self.extract_business_info(results)
        
        context["data"]["business"] = business_info
        return context

class FinancialAnalyst(BaseAgent):
    """Gathers financial metrics and performance data"""
    
    async def process(self, context):
        company = context["company"]
        
        # Search for financial reports and metrics
        financial_queries = [
            f"{company} financial statements",
            f"{company} revenue earnings",
            f"{company} stock performance"
        ]
        
        results = await self.tavily_client.search(financial_queries)
        financial_data = await self.extract_financial_metrics(results)
        
        context["data"]["financial"] = financial_data
        return context

3. File System (Content Management)

Sophisticated content curation and storage system:

class ContentCurator:
    """Implements content filtering and relevance scoring"""
    
    async def process(self, context):
        all_documents = context["data"]["raw_documents"]
        
        # Relevance scoring using Tavily's AI
        scored_docs = []
        for doc in all_documents:
            score = await self.tavily_client.get_relevance_score(doc)
            if score >= self.min_threshold:  # Default: 0.4
                scored_docs.append({
                    "content": doc,
                    "relevance_score": score,
                    "url": doc.url,
                    "timestamp": doc.timestamp
                })
        
        # Sort by relevance and deduplicate
        curated_docs = self.deduplicate_and_sort(scored_docs)
        
        # Store in persistent file system
        await self.file_system.store_curated_content(curated_docs)
        
        context["data"]["curated"] = curated_docs
        return context

class VirtualFileSystem:
    """Manages persistent storage and retrieval"""
    
    def __init__(self):
        self.storage = {}  # In production: MongoDB/PostgreSQL
        self.search_index = SearchIndex()
    
    async def store_curated_content(self, documents):
        for doc in documents:
            doc_id = self.generate_doc_id(doc)
            self.storage[doc_id] = doc
            await self.search_index.index_document(doc_id, doc)
    
    async def retrieve_by_query(self, query: str):
        return await self.search_index.search(query)

4. System Prompt (Coordination Intelligence)

Multi-model architecture with specialized prompts:

class SystemPromptManager:
    """Manages different prompts for different models and tasks"""
    
    RESEARCH_PROMPT = """
    You are a specialized research agent in a multi-agent system.
    Your role: {agent_role}
    
    Guidelines:
    1. Focus on factual, verifiable information
    2. Cite sources and provide relevance scores
    3. Structure output for downstream processing
    4. Communicate progress to the coordination system
    
    Current Task: {task_description}
    Context: {context_data}
    """
    
    SYNTHESIS_PROMPT = """
    You are using Gemini 2.0 Flash for high-context research synthesis.
    
    Task: Generate comprehensive briefing from research data
    Strengths: Large context windows, data summarization
    
    Input: {research_data}
    Requirements: {output_requirements}
    """
    
    EDITING_PROMPT = """
    You are using GPT-4.1-mini for precise formatting and editing.
    
    Task: Compile and format final report
    Strengths: Precise formatting, consistency, markdown structure
    
    Content: {briefing_content}
    Format Requirements: {format_specs}
    """

πŸ—οΈ Technical Implementation Details

Real-Time Communication System

The system implements WebSocket-based real-time updates:

class WebSocketManager:
    """Manages real-time communication with frontend"""
    
    def __init__(self):
        self.active_connections = {}
    
    async def send_status_update(self, job_id: str, status: str, message: str, result: dict):
        """Send structured updates to connected clients"""
        update = {
            "job_id": job_id,
            "status": status,
            "message": message,
            "result": result,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        if job_id in self.active_connections:
            await self.active_connections[job_id].send_json(update)

# Usage in agents
async def process_with_progress(self, context):
    await websocket_manager.send_status_update(
        job_id=context["job_id"],
        status="processing",
        message=f"Analyzing {context['company']} financial data",
        result={
            "step": "FinancialAnalyst",
            "progress": "50%",
            "documents_processed": 15
        }
    )

Multi-Model Architecture

Strategic use of different models for optimal performance:

class ModelManager:
    """Manages different models for different tasks"""
    
    def __init__(self):
        self.gemini = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
        self.gpt4 = ChatOpenAI(model="gpt-4.1-mini")
    
    async def synthesize_research(self, research_data: dict) -> str:
        """Use Gemini for high-context synthesis"""
        prompt = self.build_synthesis_prompt(research_data)
        return await self.gemini.ainvoke(prompt)
    
    async def format_report(self, content: str) -> str:
        """Use GPT-4 for precise formatting"""
        prompt = self.build_formatting_prompt(content)
        return await self.gpt4.ainvoke(prompt)

πŸ”§ Core Technologies and Integrations

1. LangGraph Integration

For complex workflow management:

from langgraph.graph import StateGraph, END

def create_research_graph():
    """Create LangGraph workflow for research pipeline"""
    
    workflow = StateGraph(ResearchState)
    
    # Add nodes for each agent
    workflow.add_node("company_analyzer", CompanyAnalyzer())
    workflow.add_node("industry_analyzer", IndustryAnalyzer())
    workflow.add_node("financial_analyst", FinancialAnalyst())
    workflow.add_node("news_scanner", NewsScanner())
    workflow.add_node("curator", ContentCurator())
    workflow.add_node("briefing", BriefingGenerator())
    workflow.add_node("editor", ReportEditor())
    
    # Define workflow edges
    workflow.add_edge("company_analyzer", "industry_analyzer")
    workflow.add_edge("industry_analyzer", "financial_analyst")
    workflow.add_edge("financial_analyst", "news_scanner")
    workflow.add_edge("news_scanner", "curator")
    workflow.add_edge("curator", "briefing")
    workflow.add_edge("briefing", "editor")
    workflow.add_edge("editor", END)
    
    # Set entry point
    workflow.set_entry_point("company_analyzer")
    
    return workflow.compile()

2. MCP (Model Context Protocol) Integration

For dynamic tool discovery:

from deepmcpagent import HTTPServerSpec, build_deep_agent

async def create_mcp_enhanced_agent():
    """Create agent with MCP tool discovery"""
    
    servers = {
        "research_tools": HTTPServerSpec(
            url="http://localhost:8000/mcp",
            transport="http"
        ),
        "financial_data": HTTPServerSpec(
            url="http://financial-api.com/mcp",
            transport="sse"
        )
    }
    
    model = ChatOpenAI(model="gpt-4.1")
    
    graph, loader = await build_deep_agent(
        servers=servers,
        model=model,
        instructions="You are a research agent with access to dynamic tools",
        trace_tools=True
    )
    
    return graph, loader

3. FastAPI Backend Architecture

from fastapi import FastAPI, WebSocket, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="Deep Agents Research System")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/research/start")
async def start_research(
    request: ResearchRequest,
    background_tasks: BackgroundTasks
):
    """Start a new research job"""
    job_id = generate_job_id()
    
    # Start research in background
    background_tasks.add_task(
        execute_research_pipeline,
        job_id,
        request.company_name,
        request.research_depth
    )
    
    return {"job_id": job_id, "status": "started"}

@app.websocket("/research/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
    """WebSocket endpoint for real-time updates"""
    await websocket.accept()
    websocket_manager.add_connection(job_id, websocket)
    
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        websocket_manager.remove_connection(job_id)

πŸ“Š Performance Optimization Strategies

1. Parallel Processing

Execute independent agents concurrently:

import asyncio

async def parallel_research_phase(context):
    """Run independent research agents in parallel"""
    
    tasks = [
        CompanyAnalyzer().process(context.copy()),
        IndustryAnalyzer().process(context.copy()),
        FinancialAnalyst().process(context.copy()),
        NewsScanner().process(context.copy())
    ]
    
    results = await asyncio.gather(*tasks)
    
    # Merge results
    merged_context = context.copy()
    for result in results:
        merged_context["data"].update(result["data"])
    
    return merged_context

2. Intelligent Caching

Cache expensive operations:

from functools import lru_cache
import redis

class CacheManager:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    async def get_cached_research(self, company: str, research_type: str):
        """Get cached research results"""
        cache_key = f"research:{company}:{research_type}"
        cached = self.redis_client.get(cache_key)
        
        if cached:
            return json.loads(cached)
        return None
    
    async def cache_research(self, company: str, research_type: str, data: dict):
        """Cache research results with TTL"""
        cache_key = f"research:{company}:{research_type}"
        self.redis_client.setex(
            cache_key,
            timedelta(hours=24),  # 24-hour TTL
            json.dumps(data)
        )

3. Resource Management

Manage API rate limits and costs:

class ResourceManager:
    def __init__(self):
        self.api_quotas = {
            "tavily": RateLimiter(max_calls=100, time_window=3600),
            "openai": RateLimiter(max_calls=1000, time_window=3600),
            "gemini": RateLimiter(max_calls=500, time_window=3600)
        }
    
    async def execute_with_quota(self, api_name: str, func, *args, **kwargs):
        """Execute function with rate limiting"""
        limiter = self.api_quotas[api_name]
        
        async with limiter:
            return await func(*args, **kwargs)

πŸ”’ Security and Reliability

1. Input Validation and Sanitization

from pydantic import BaseModel, validator
import re

class ResearchRequest(BaseModel):
    company_name: str
    research_depth: str = "standard"
    
    @validator('company_name')
    def validate_company_name(cls, v):
        # Sanitize company name input
        if not re.match(r'^[a-zA-Z0-9\s\-\.&]+$', v):
            raise ValueError('Invalid company name format')
        if len(v) > 100:
            raise ValueError('Company name too long')
        return v.strip()

2. Error Handling and Recovery

class ResilientAgent:
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
    
    async def execute_with_retry(self, func, *args, **kwargs):
        """Execute function with exponential backoff retry"""
        for attempt in range(self.max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise e
                
                wait_time = 2 ** attempt  # Exponential backoff
                await asyncio.sleep(wait_time)
                
                logger.warning(f"Attempt {attempt + 1} failed: {e}")

3. Monitoring and Observability

import structlog
from prometheus_client import Counter, Histogram, start_http_server

# Metrics
AGENT_EXECUTIONS = Counter('agent_executions_total', 'Total agent executions', ['agent_type', 'status'])
EXECUTION_TIME = Histogram('agent_execution_seconds', 'Agent execution time', ['agent_type'])

logger = structlog.get_logger()

class MonitoredAgent:
    def __init__(self, agent_type: str):
        self.agent_type = agent_type
    
    async def execute(self, task):
        with EXECUTION_TIME.labels(agent_type=self.agent_type).time():
            try:
                result = await self.process_task(task)
                AGENT_EXECUTIONS.labels(agent_type=self.agent_type, status='success').inc()
                
                logger.info(
                    "Agent execution completed",
                    agent_type=self.agent_type,
                    task_id=task.id,
                    duration=time.time() - start_time
                )
                
                return result
            except Exception as e:
                AGENT_EXECUTIONS.labels(agent_type=self.agent_type, status='error').inc()
                
                logger.error(
                    "Agent execution failed",
                    agent_type=self.agent_type,
                    task_id=task.id,
                    error=str(e)
                )
                raise

πŸš€ Deployment Architecture

1. Docker Containerization

# Dockerfile for Deep Agents
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Start application
CMD ["uvicorn", "application:app", "--host", "0.0.0.0", "--port", "8000"]

2. Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: deep-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: deep-agents
  template:
    metadata:
      labels:
        app: deep-agents
    spec:
      containers:
      - name: deep-agents
        image: deep-agents:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-key
        - name: TAVILY_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: tavily-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: deep-agents-service
spec:
  selector:
    app: deep-agents
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

πŸ“ˆ Scaling Considerations

1. Horizontal Scaling

  • Stateless Agents: Design agents to be stateless for easy horizontal scaling
  • Load Balancing: Distribute requests across multiple agent instances
  • Queue Management: Use message queues for task distribution

2. Vertical Scaling

  • Memory Optimization: Efficient memory usage for large context processing
  • CPU Optimization: Parallel processing for compute-intensive tasks
  • GPU Acceleration: For model inference and heavy computations

3. Database Scaling

  • Read Replicas: Scale read operations across multiple database instances
  • Sharding: Partition data across multiple databases
  • Caching: Redis/Memcached for frequently accessed data

πŸ” Testing Strategy

1. Unit Testing

import pytest
from unittest.mock import AsyncMock, patch

class TestCompanyAnalyzer:
    @pytest.fixture
    def analyzer(self):
        return CompanyAnalyzer()
    
    @pytest.mark.asyncio
    async def test_process_success(self, analyzer):
        # Mock external dependencies
        with patch.object(analyzer.tavily_client, 'search') as mock_search:
            mock_search.return_value = [{"title": "Test Company", "content": "Test content"}]
            
            context = {"company": "Test Corp", "data": {}}
            result = await analyzer.process(context)
            
            assert "business" in result["data"]
            assert result["data"]["business"] is not None
    
    @pytest.mark.asyncio
    async def test_process_api_failure(self, analyzer):
        # Test error handling
        with patch.object(analyzer.tavily_client, 'search') as mock_search:
            mock_search.side_effect = Exception("API Error")
            
            context = {"company": "Test Corp", "data": {}}
            
            with pytest.raises(Exception):
                await analyzer.process(context)

2. Integration Testing

@pytest.mark.integration
class TestResearchPipeline:
    @pytest.mark.asyncio
    async def test_full_pipeline(self):
        pipeline = ResearchPipeline()
        
        # Test with real APIs (using test keys)
        result = await pipeline.execute("Apple Inc")
        
        assert result is not None
        assert "business" in result
        assert "financial" in result
        assert "industry" in result
        assert "news" in result

3. Load Testing

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor

async def load_test_research_endpoint():
    """Load test the research endpoint"""
    
    async def make_request(session, company):
        async with session.post(
            "http://localhost:8000/research/start",
            json={"company_name": company}
        ) as response:
            return await response.json()
    
    async with aiohttp.ClientSession() as session:
        companies = ["Apple", "Google", "Microsoft", "Amazon", "Tesla"] * 20
        
        tasks = [make_request(session, company) for company in companies]
        results = await asyncio.gather(*tasks)
        
        success_count = sum(1 for r in results if r.get("status") == "started")
        print(f"Success rate: {success_count}/{len(results)}")

πŸ“š Additional Resources

Key Repositories to Study

  1. DeepMCPAgent (cryxnet/DeepMCPAgent): MCP-based agent framework
  2. Company Research Agent (guy-hartstein/company-research-agent): Production multi-agent system
  3. LangGraph Examples (langchain-ai/langgraph): Official workflow examples

Essential Documentation

Community and Support


This context document provides the technical foundation for implementing sophisticated Deep Agent systems. Use it alongside the Planning Document for comprehensive system development.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment