Skip to content

Instantly share code, notes, and snippets.

@decagondev
Created May 1, 2025 15:31
Show Gist options
  • Save decagondev/8a69a1c7ed3ddb858d3e10a655880b1c to your computer and use it in GitHub Desktop.
Save decagondev/8a69a1c7ed3ddb858d3e10a655880b1c to your computer and use it in GitHub Desktop.

Persisting and Inspecting Conversation History in LangGraph

In a multi-agent LangGraph setup, capturing and inspecting the full conversation history is essential for debugging, auditing, and improving LLM workflows. Below are several recommended strategies:


βœ… Recommended Approaches

1. Use LangSmith (Best Overall)

LangSmith is purpose-built for:

  • Recording full chain + agent interactions
  • Viewing execution traces, inputs/outputs, and errors
  • Tagging and filtering conversations

It integrates deeply with LangGraph and LangChain. You can decorate nodes and edges to automatically log data with minimal boilerplate.

πŸ“Œ Best if you're using LangChain or LangGraph already and want cloud-based observability.

LangSmith Implementation Example:

import os
from langchain import LangChain
from langgraph.graph import StateGraph
from langchain_core.tracers.langchain import wait_for_all_tracers

# Set up LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-conversation"

# Define your LangGraph state and nodes
class ConversationState(TypedDict):
    messages: list[dict]
    current_agent: str

def agent_one(state):
    # Agent logic here
    return {"messages": state["messages"] + [{"role": "agent_one", "content": "..."}]}

def agent_two(state):
    # Agent logic here
    return {"messages": state["messages"] + [{"role": "agent_two", "content": "..."}]}

# Create graph with automatic tracing
workflow = StateGraph(ConversationState)
workflow.add_node("agent_one", agent_one)
workflow.add_node("agent_two", agent_two)
# Add edges...

# Build and run graph
app = workflow.compile()
result = app.invoke({"messages": [], "current_agent": "agent_one"})

# Ensure all traces are sent to LangSmith
wait_for_all_tracers()
graph TD
    A[User Input] --> B[LangGraph Execution]
    B --> C[Agent One]
    B --> D[Agent Two]
    C --> E[Tool Calls]
    D --> E
    
    %% LangSmith Tracing
    B -.->|Trace| F[LangSmith API]
    C -.->|Trace| F
    D -.->|Trace| F
    E -.->|Trace| F
    
    F --> G[LangSmith UI]
    G --> H[Trace Explorer]
    G --> I[Debug Session]
    G --> J[Dataset Creation]
Loading

2. Manual History Tracking via State Updates

Since LangGraph relies on a stateful graph, you can:

  • Add a history field (e.g., a list of message dicts) to your state schema
  • At each node, append interactions to this list
  • Persist the entire state object in a database (e.g., Postgres, MongoDB, Firestore)

πŸ” Best for self-hosted setups or when LangSmith isn't an option.

Manual History Implementation Example:

from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph
import asyncpg  # For PostgreSQL

class HistoryEntry(TypedDict):
    agent: str
    timestamp: float
    input: Dict[str, Any]
    output: Dict[str, Any]
    metadata: Dict[str, Any]

class ConversationState(TypedDict):
    messages: List[Dict[str, str]]
    history: List[HistoryEntry]
    conversation_id: str

async def persist_history(state: ConversationState) -> None:
    """Persist conversation history to PostgreSQL."""
    conn = await asyncpg.connect("postgresql://user:password@localhost/history_db")
    
    # Convert history to JSON-compatible format
    history_json = json.dumps(state["history"])
    
    # Store or update conversation history
    await conn.execute(
        """
        INSERT INTO conversation_histories (conversation_id, history)
        VALUES ($1, $2)
        ON CONFLICT (conversation_id) 
        DO UPDATE SET history = $2, updated_at = NOW()
        """,
        state["conversation_id"], history_json
    )
    await conn.close()

def agent_with_history_tracking(state: ConversationState) -> ConversationState:
    # Original agent logic
    response = call_llm_or_something(state["messages"])
    
    # Update message history
    new_messages = state["messages"] + [{"role": "agent", "content": response}]
    
    # Update detailed history with timing and metadata
    import time
    new_history_entry = {
        "agent": "primary_agent",
        "timestamp": time.time(),
        "input": {"messages": state["messages"]},
        "output": {"message": response},
        "metadata": {"tokens_used": 823}  # Example metadata
    }
    
    # Return updated state
    return {
        "messages": new_messages,
        "history": state["history"] + [new_history_entry],
        "conversation_id": state["conversation_id"]
    }

# After graph execution, persist the history
async def run_with_persistence():
    workflow = StateGraph(ConversationState)
    workflow.add_node("agent", agent_with_history_tracking)
    # Add edges...
    
    app = workflow.compile()
    result = app.invoke({
        "messages": [],
        "history": [],
        "conversation_id": "conv_" + str(uuid.uuid4())
    })
    
    # Persist final state
    await persist_history(result)
graph TD
    A[User Input] --> B[LangGraph Execution]
    B --> C[Agent Node]
    C --> D[Process Input]
    D --> E[Update State]
    E --> F[Append to History]
    F --> G[Return Updated State]
    
    %% Persistence Flow
    G -->|Complete Execution| H[Trigger Persistence]
    H --> I[Format History]
    I --> J[Database Connection]
    J --> K[Store in PostgreSQL]
    
    subgraph "Database Schema"
        L[conversation_histories]
        L -->|Fields| M[conversation_id]
        L -->|Fields| N[history JSON]
        L -->|Fields| O[created_at]
        L -->|Fields| P[updated_at]
    end
Loading

3. Custom Logging Middleware

Wrap agents or tools with middleware that logs:

  • Inputs and outputs
  • Timestamps, agent identity, and tool used
  • Intermediate thoughts or observations

Logs can be pushed to:

  • Text/JSON files (for local debugging)
  • Centralized logging platforms (e.g., ELK stack, Datadog)

Custom Middleware Example:

import json
import time
from functools import wraps
from typing import Callable, Dict, Any, List

# Logger factory for creating middleware
def create_history_logger(log_file_path: str = "conversation_history.jsonl"):
    """Create a middleware that logs all interactions to a file."""
    
    history_records: List[Dict[str, Any]] = []
    
    def log_interaction(agent_name: str, input_data: Dict, output_data: Dict, metadata: Dict = None):
        """Log a single interaction to the history."""
        record = {
            "timestamp": time.time(),
            "agent": agent_name,
            "input": input_data,
            "output": output_data,
            "metadata": metadata or {}
        }
        history_records.append(record)
        
        # Write to file
        with open(log_file_path, "a") as f:
            f.write(json.dumps(record) + "\n")
        
        return record
    
    def middleware(agent_func: Callable):
        """Middleware decorator for agent functions."""
        @wraps(agent_func)
        def wrapped_agent(state):
            # Extract agent name from function name or explicit annotation
            agent_name = getattr(agent_func, "__agent_name__", agent_func.__name__)
            
            # Record input
            input_data = {"state": state}
            
            # Execute agent function and measure time
            start_time = time.time()
            result = agent_func(state)
            execution_time = time.time() - start_time
            
            # Record output with metadata
            output_data = {"result": result}
            metadata = {
                "execution_time_ms": execution_time * 1000,
                "memory_usage_mb": get_memory_usage()  # Define this function separately
            }
            
            # Log the complete interaction
            log_interaction(agent_name, input_data, output_data, metadata)
            
            return result
        return wrapped_agent
    
    return middleware, history_records

# Usage example
logger_middleware, history = create_history_logger("multi_agent_debug.jsonl")

@logger_middleware
def researcher_agent(state):
    # Agent implementation...
    return {"messages": state["messages"] + [{"role": "researcher", "content": "..."}]}

# Get decorated agent functions and build graph
workflow = StateGraph(ConversationState)
workflow.add_node("researcher", researcher_agent)
# Add more nodes and edges...
graph TD
    A[User Input] --> B[Middleware Layer]
    B --> C[Log Input]
    C --> D[Execute Agent Function]
    D --> E[Measure Performance]
    E --> F[Log Output]
    F --> G[Return Result]
    
    %% Logging Destinations
    F -->|Write| H[JSON Lines File]
    F -->|Optional| I[ELK Stack]
    F -->|Optional| J[Datadog]
    F -->|Optional| K[CloudWatch]
    
    %% Visualization Tools
    H --> L[Custom Dashboard]
    I --> M[Kibana]
    J --> N[Datadog Dashboard]
    K --> O[CloudWatch Dashboard]
Loading

4. Streamed Output Capture (for Chat UIs)

If you're exposing LangGraph via a WebSocket or API route:

  • Stream each response chunk to the frontend
  • Buffer and persist it on the backend for full chat history

Combine with user/session metadata for long-term tracking.

Streaming Implementation Example:

import asyncio
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver

app = FastAPI()

# Set up LangGraph with streaming capability
class ChatState(TypedDict):
    messages: List[Dict[str, str]]
    stream_events: List[Dict[str, Any]]

# Create persistent checkpoint storage
saver = SqliteSaver(":memory:")

def streaming_agent(state: ChatState):
    """Agent that produces streamable outputs."""
    messages = state["messages"]
    
    # Process with streaming capability
    async def stream_tokens():
        response_text = ""
        # Simulate token-by-token generation
        for token in ["Hello, ", "I'm ", "thinking ", "about ", "your ", "question..."]:
            response_text += token
            yield {
                "type": "token",
                "content": token,
                "partial_response": response_text
            }
        
        # Final response
        final_response = "After analysis, I believe the answer is X because of Y and Z."
        yield {
            "type": "complete",
            "content": final_response
        }
    
    # Collect stream events for history
    stream_events = []
    for event in stream_tokens():
        stream_events.append(event)
    
    # Add completed response to messages
    final_message = {"role": "assistant", "content": stream_events[-1]["content"]}
    
    return {
        "messages": messages + [final_message],
        "stream_events": state.get("stream_events", []) + stream_events
    }

# Build graph with streaming agent
workflow = StateGraph(ChatState)
workflow.add_node("streaming_agent", streaming_agent)
workflow.set_entry_point("streaming_agent")
app = workflow.compile(checkpointer=saver)

@app.websocket("/chat/{conversation_id}")
async def websocket_endpoint(websocket: WebSocket, conversation_id: str):
    await websocket.accept()
    
    # Initialize or restore conversation
    try:
        # Try to restore existing conversation
        state = saver.load(conversation_id)
    except KeyError:
        # New conversation
        state = {"messages": [], "stream_events": []}
    
    while True:
        # Receive message from client
        user_message = await websocket.receive_text()
        
        # Update state with user message
        state["messages"].append({"role": "user", "content": user_message})
        
        # Process with LangGraph and stream results
        for chunk in app.stream(state, stream_mode="values"):
            # Send each chunk to the client
            if "stream_events" in chunk:
                for event in chunk["stream_events"]:
                    await websocket.send_json(event)
            
            # Save state after each chunk
            saver.persist(conversation_id, chunk)
        
        # Update state for next iteration
        state = saver.load(conversation_id)
graph TD
    A[User Input] --> B[WebSocket Server]
    B --> C[Update State with User Message]
    C --> D[Process with LangGraph]
    D --> E[Generate Streaming Response]
    
    E -->|Stream Chunks| F[WebSocket Client]
    E -->|Buffer| G[Collect Full Response]
    
    G --> H[Update Conversation State]
    H --> I[Persist to Database]
    
    %% Storage Components
    I -->|Checkpoint| J[SQLite Saver]
    I -->|Optional| K[Redis Cache]
    I -->|Long-term| L[PostgreSQL]
    
    %% Client Side
    F --> M[Update UI Incrementally]
    M --> N[Show Thinking Process]
    M --> O[Display Final Answer]
Loading

Storage Requirements Comparison

Approach Storage Type Relative Size Retention Considerations
LangSmith Cloud-based Medium-High Data retention based on LangSmith plan
Manual State Tracking Self-hosted DB Medium Customizable retention policies
Custom Logging Files/Log Platform High Log rotation and archiving needed
Streamed Output Checkpoint DB Low-Medium Session-based with optional persistence

Performance Considerations

  1. Memory Usage

    • Raw conversation history can grow large in memory
    • Consider using streaming iterators rather than storing everything in memory
    • For long conversations, implement pruning or summarization strategies
  2. Database Performance

    • Use indexed fields (conversation_id, timestamp) for faster querying
    • Consider time-series optimized databases for logging-heavy implementations
    • Implement batched writes for high-throughput systems
  3. Real-time Requirements

    • Asynchronous logging for minimal impact on response times
    • Consider separate worker processes for history persistence
    • Use memory caching (Redis) for frequently accessed conversations
  4. Scaling Considerations

    • Sharding conversation history by user_id or time periods
    • Implement archive policies for older conversations
    • Use cloud object storage (S3, GCS) for cold storage of complete histories

Data Retention and Privacy

  1. Retention Policies

    • Implement time-based retention (e.g., delete after 30/60/90 days)
    • Allow users to request history deletion
    • Consider different policies for different data types:
      • User messages: Longer retention
      • System debugging info: Shorter retention
  2. Privacy Considerations

    • Store personally identifiable information (PII) separately
    • Implement encryption for sensitive conversation data
    • Consider anonymization for conversations used in analysis
  3. Compliance

    • GDPR: Enable "right to be forgotten" capabilities
    • CCPA: User data access and export functionality
    • Audit logs of who accessed conversation history

🏁 Final Recommendation

Goal Suggested Approach Best For
Full trace, debugging, audit LangSmith Development teams with LangChain ecosystem
Self-hosted tracking Manual history + database Production environments requiring data control
Lightweight or flexible Custom logging middleware Cross-cutting analysis and custom metrics
Real-time chat apps Stream & persist chunks User-facing applications with live feedback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment