In a multi-agent LangGraph setup, capturing and inspecting the full conversation history is essential for debugging, auditing, and improving LLM workflows. Below are several recommended strategies:
LangSmith is purpose-built for:
- Recording full chain + agent interactions
- Viewing execution traces, inputs/outputs, and errors
- Tagging and filtering conversations
It integrates deeply with LangGraph and LangChain. You can decorate nodes and edges to automatically log data with minimal boilerplate.
π Best if you're using LangChain or LangGraph already and want cloud-based observability.
import os
from langchain import LangChain
from langgraph.graph import StateGraph
from langchain_core.tracers.langchain import wait_for_all_tracers
# Set up LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-conversation"
# Define your LangGraph state and nodes
class ConversationState(TypedDict):
messages: list[dict]
current_agent: str
def agent_one(state):
# Agent logic here
return {"messages": state["messages"] + [{"role": "agent_one", "content": "..."}]}
def agent_two(state):
# Agent logic here
return {"messages": state["messages"] + [{"role": "agent_two", "content": "..."}]}
# Create graph with automatic tracing
workflow = StateGraph(ConversationState)
workflow.add_node("agent_one", agent_one)
workflow.add_node("agent_two", agent_two)
# Add edges...
# Build and run graph
app = workflow.compile()
result = app.invoke({"messages": [], "current_agent": "agent_one"})
# Ensure all traces are sent to LangSmith
wait_for_all_tracers()
graph TD
A[User Input] --> B[LangGraph Execution]
B --> C[Agent One]
B --> D[Agent Two]
C --> E[Tool Calls]
D --> E
%% LangSmith Tracing
B -.->|Trace| F[LangSmith API]
C -.->|Trace| F
D -.->|Trace| F
E -.->|Trace| F
F --> G[LangSmith UI]
G --> H[Trace Explorer]
G --> I[Debug Session]
G --> J[Dataset Creation]
Since LangGraph relies on a stateful graph, you can:
- Add a
history
field (e.g., a list of message dicts) to your state schema - At each node, append interactions to this list
- Persist the entire state object in a database (e.g., Postgres, MongoDB, Firestore)
π Best for self-hosted setups or when LangSmith isn't an option.
from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph
import asyncpg # For PostgreSQL
class HistoryEntry(TypedDict):
agent: str
timestamp: float
input: Dict[str, Any]
output: Dict[str, Any]
metadata: Dict[str, Any]
class ConversationState(TypedDict):
messages: List[Dict[str, str]]
history: List[HistoryEntry]
conversation_id: str
async def persist_history(state: ConversationState) -> None:
"""Persist conversation history to PostgreSQL."""
conn = await asyncpg.connect("postgresql://user:password@localhost/history_db")
# Convert history to JSON-compatible format
history_json = json.dumps(state["history"])
# Store or update conversation history
await conn.execute(
"""
INSERT INTO conversation_histories (conversation_id, history)
VALUES ($1, $2)
ON CONFLICT (conversation_id)
DO UPDATE SET history = $2, updated_at = NOW()
""",
state["conversation_id"], history_json
)
await conn.close()
def agent_with_history_tracking(state: ConversationState) -> ConversationState:
# Original agent logic
response = call_llm_or_something(state["messages"])
# Update message history
new_messages = state["messages"] + [{"role": "agent", "content": response}]
# Update detailed history with timing and metadata
import time
new_history_entry = {
"agent": "primary_agent",
"timestamp": time.time(),
"input": {"messages": state["messages"]},
"output": {"message": response},
"metadata": {"tokens_used": 823} # Example metadata
}
# Return updated state
return {
"messages": new_messages,
"history": state["history"] + [new_history_entry],
"conversation_id": state["conversation_id"]
}
# After graph execution, persist the history
async def run_with_persistence():
workflow = StateGraph(ConversationState)
workflow.add_node("agent", agent_with_history_tracking)
# Add edges...
app = workflow.compile()
result = app.invoke({
"messages": [],
"history": [],
"conversation_id": "conv_" + str(uuid.uuid4())
})
# Persist final state
await persist_history(result)
graph TD
A[User Input] --> B[LangGraph Execution]
B --> C[Agent Node]
C --> D[Process Input]
D --> E[Update State]
E --> F[Append to History]
F --> G[Return Updated State]
%% Persistence Flow
G -->|Complete Execution| H[Trigger Persistence]
H --> I[Format History]
I --> J[Database Connection]
J --> K[Store in PostgreSQL]
subgraph "Database Schema"
L[conversation_histories]
L -->|Fields| M[conversation_id]
L -->|Fields| N[history JSON]
L -->|Fields| O[created_at]
L -->|Fields| P[updated_at]
end
Wrap agents or tools with middleware that logs:
- Inputs and outputs
- Timestamps, agent identity, and tool used
- Intermediate thoughts or observations
Logs can be pushed to:
- Text/JSON files (for local debugging)
- Centralized logging platforms (e.g., ELK stack, Datadog)
import json
import time
from functools import wraps
from typing import Callable, Dict, Any, List
# Logger factory for creating middleware
def create_history_logger(log_file_path: str = "conversation_history.jsonl"):
"""Create a middleware that logs all interactions to a file."""
history_records: List[Dict[str, Any]] = []
def log_interaction(agent_name: str, input_data: Dict, output_data: Dict, metadata: Dict = None):
"""Log a single interaction to the history."""
record = {
"timestamp": time.time(),
"agent": agent_name,
"input": input_data,
"output": output_data,
"metadata": metadata or {}
}
history_records.append(record)
# Write to file
with open(log_file_path, "a") as f:
f.write(json.dumps(record) + "\n")
return record
def middleware(agent_func: Callable):
"""Middleware decorator for agent functions."""
@wraps(agent_func)
def wrapped_agent(state):
# Extract agent name from function name or explicit annotation
agent_name = getattr(agent_func, "__agent_name__", agent_func.__name__)
# Record input
input_data = {"state": state}
# Execute agent function and measure time
start_time = time.time()
result = agent_func(state)
execution_time = time.time() - start_time
# Record output with metadata
output_data = {"result": result}
metadata = {
"execution_time_ms": execution_time * 1000,
"memory_usage_mb": get_memory_usage() # Define this function separately
}
# Log the complete interaction
log_interaction(agent_name, input_data, output_data, metadata)
return result
return wrapped_agent
return middleware, history_records
# Usage example
logger_middleware, history = create_history_logger("multi_agent_debug.jsonl")
@logger_middleware
def researcher_agent(state):
# Agent implementation...
return {"messages": state["messages"] + [{"role": "researcher", "content": "..."}]}
# Get decorated agent functions and build graph
workflow = StateGraph(ConversationState)
workflow.add_node("researcher", researcher_agent)
# Add more nodes and edges...
graph TD
A[User Input] --> B[Middleware Layer]
B --> C[Log Input]
C --> D[Execute Agent Function]
D --> E[Measure Performance]
E --> F[Log Output]
F --> G[Return Result]
%% Logging Destinations
F -->|Write| H[JSON Lines File]
F -->|Optional| I[ELK Stack]
F -->|Optional| J[Datadog]
F -->|Optional| K[CloudWatch]
%% Visualization Tools
H --> L[Custom Dashboard]
I --> M[Kibana]
J --> N[Datadog Dashboard]
K --> O[CloudWatch Dashboard]
If you're exposing LangGraph via a WebSocket or API route:
- Stream each response chunk to the frontend
- Buffer and persist it on the backend for full chat history
Combine with user/session metadata for long-term tracking.
import asyncio
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
app = FastAPI()
# Set up LangGraph with streaming capability
class ChatState(TypedDict):
messages: List[Dict[str, str]]
stream_events: List[Dict[str, Any]]
# Create persistent checkpoint storage
saver = SqliteSaver(":memory:")
def streaming_agent(state: ChatState):
"""Agent that produces streamable outputs."""
messages = state["messages"]
# Process with streaming capability
async def stream_tokens():
response_text = ""
# Simulate token-by-token generation
for token in ["Hello, ", "I'm ", "thinking ", "about ", "your ", "question..."]:
response_text += token
yield {
"type": "token",
"content": token,
"partial_response": response_text
}
# Final response
final_response = "After analysis, I believe the answer is X because of Y and Z."
yield {
"type": "complete",
"content": final_response
}
# Collect stream events for history
stream_events = []
for event in stream_tokens():
stream_events.append(event)
# Add completed response to messages
final_message = {"role": "assistant", "content": stream_events[-1]["content"]}
return {
"messages": messages + [final_message],
"stream_events": state.get("stream_events", []) + stream_events
}
# Build graph with streaming agent
workflow = StateGraph(ChatState)
workflow.add_node("streaming_agent", streaming_agent)
workflow.set_entry_point("streaming_agent")
app = workflow.compile(checkpointer=saver)
@app.websocket("/chat/{conversation_id}")
async def websocket_endpoint(websocket: WebSocket, conversation_id: str):
await websocket.accept()
# Initialize or restore conversation
try:
# Try to restore existing conversation
state = saver.load(conversation_id)
except KeyError:
# New conversation
state = {"messages": [], "stream_events": []}
while True:
# Receive message from client
user_message = await websocket.receive_text()
# Update state with user message
state["messages"].append({"role": "user", "content": user_message})
# Process with LangGraph and stream results
for chunk in app.stream(state, stream_mode="values"):
# Send each chunk to the client
if "stream_events" in chunk:
for event in chunk["stream_events"]:
await websocket.send_json(event)
# Save state after each chunk
saver.persist(conversation_id, chunk)
# Update state for next iteration
state = saver.load(conversation_id)
graph TD
A[User Input] --> B[WebSocket Server]
B --> C[Update State with User Message]
C --> D[Process with LangGraph]
D --> E[Generate Streaming Response]
E -->|Stream Chunks| F[WebSocket Client]
E -->|Buffer| G[Collect Full Response]
G --> H[Update Conversation State]
H --> I[Persist to Database]
%% Storage Components
I -->|Checkpoint| J[SQLite Saver]
I -->|Optional| K[Redis Cache]
I -->|Long-term| L[PostgreSQL]
%% Client Side
F --> M[Update UI Incrementally]
M --> N[Show Thinking Process]
M --> O[Display Final Answer]
Approach | Storage Type | Relative Size | Retention Considerations |
---|---|---|---|
LangSmith | Cloud-based | Medium-High | Data retention based on LangSmith plan |
Manual State Tracking | Self-hosted DB | Medium | Customizable retention policies |
Custom Logging | Files/Log Platform | High | Log rotation and archiving needed |
Streamed Output | Checkpoint DB | Low-Medium | Session-based with optional persistence |
-
Memory Usage
- Raw conversation history can grow large in memory
- Consider using streaming iterators rather than storing everything in memory
- For long conversations, implement pruning or summarization strategies
-
Database Performance
- Use indexed fields (conversation_id, timestamp) for faster querying
- Consider time-series optimized databases for logging-heavy implementations
- Implement batched writes for high-throughput systems
-
Real-time Requirements
- Asynchronous logging for minimal impact on response times
- Consider separate worker processes for history persistence
- Use memory caching (Redis) for frequently accessed conversations
-
Scaling Considerations
- Sharding conversation history by user_id or time periods
- Implement archive policies for older conversations
- Use cloud object storage (S3, GCS) for cold storage of complete histories
-
Retention Policies
- Implement time-based retention (e.g., delete after 30/60/90 days)
- Allow users to request history deletion
- Consider different policies for different data types:
- User messages: Longer retention
- System debugging info: Shorter retention
-
Privacy Considerations
- Store personally identifiable information (PII) separately
- Implement encryption for sensitive conversation data
- Consider anonymization for conversations used in analysis
-
Compliance
- GDPR: Enable "right to be forgotten" capabilities
- CCPA: User data access and export functionality
- Audit logs of who accessed conversation history
Goal | Suggested Approach | Best For |
---|---|---|
Full trace, debugging, audit | LangSmith | Development teams with LangChain ecosystem |
Self-hosted tracking | Manual history + database |
Production environments requiring data control |
Lightweight or flexible | Custom logging middleware | Cross-cutting analysis and custom metrics |
Real-time chat apps | Stream & persist chunks | User-facing applications with live feedback |