Asynchronous validation in LangGraph refers to validation processes that run independently of the main request-response cycle. Instead of blocking the response while validation completes, the system:
- Returns results to the user immediately
- Runs validation in the background
- Handles validation outcomes separately (logging, alerting, or feeding back into the system)
sequenceDiagram
participant User
participant API
participant LangGraph
participant AsyncValidator
participant DB
User->>API: Request
API->>LangGraph: Process request
LangGraph->>API: Generate response
API->>User: Return response
API->>AsyncValidator: Trigger validation (non-blocking)
Note over AsyncValidator: Validation runs independently
AsyncValidator->>DB: Log validation results
AsyncValidator-->>LangGraph: (Optional) Feed results to future requests
Using a task queue system (like Celery, RQ, or AWS SQS) to handle asynchronous validation:
from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph
import asyncio
from celery import Celery
# Setup Celery
app = Celery('async_validator', broker='redis://localhost:6379/0')
# Define validation task
@app.task
def validate_response(response_id: str, generated_content: str, prompt: str):
"""
Asynchronously validate a response after it's been sent to the user.
This runs independently and doesn't block the main flow.
"""
# Run validation logic
validation_result = run_validation_checks(generated_content, prompt)
# Log results to database
store_validation_result(response_id, validation_result)
# Optionally trigger alerts for failed validations
if not validation_result["passed"]:
send_alert(response_id, validation_result["issues"])
return validation_result
# LangGraph state definition
class ConversationState(TypedDict):
messages: List[Dict[str, str]]
current_response: str
response_id: str
def process_request(state: ConversationState) -> ConversationState:
"""Process the user request and generate a response."""
# Implementation of your LLM response generation
# ...
# Generate unique ID for this response
import uuid
response_id = str(uuid.uuid4())
return {
**state,
"current_response": "Generated response content...",
"response_id": response_id
}
def send_response_and_validate(state: ConversationState) -> ConversationState:
"""Send response to user and trigger async validation."""
response = state["current_response"]
response_id = state["response_id"]
# Extract prompt from messages for validation context
last_user_message = next(
(msg["content"] for msg in reversed(state["messages"]) if msg["role"] == "user"),
""
)
# Trigger async validation task
# This doesn't block or wait for completion
validate_response.delay(response_id, response, last_user_message)
# Update conversation with assistant's response
messages = state["messages"] + [{"role": "assistant", "content": response}]
return {
**state,
"messages": messages
}
# Create the graph
workflow = StateGraph(ConversationState)
workflow.add_node("process_request", process_request)
workflow.add_node("send_response", send_response_and_validate)
workflow.add_edge("process_request", "send_response")
workflow.set_entry_point("process_request")
workflow.set_finish_point("send_response")
Using Python's asyncio for concurrent operations in FastAPI:
from fastapi import FastAPI, WebSocket
from langchain_core.runnables import RunnablePassthrough
import asyncio
from langgraph.graph import StateGraph
app = FastAPI()
async def run_validation(response_id: str, content: str, prompt: str):
"""Async function to run validation checks."""
# This runs independently and doesn't block the response
try:
# Complex validation logic here
validation_result = {"passed": True, "issues": []}
# Simulate validation work
await asyncio.sleep(2)
# Log results
await store_validation_result_async(response_id, validation_result)
except Exception as e:
# Log error but don't affect main flow
print(f"Validation error: {e}")
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Setup LangGraph workflow
workflow = get_langgraph_workflow()
while True:
# Receive message
data = await websocket.receive_text()
# Process with LangGraph
state = {"messages": [{"role": "user", "content": data}]}
result = workflow.invoke(state)
# Extract response to send back
response = result["messages"][-1]["content"]
response_id = result["response_id"]
# Send response immediately
await websocket.send_text(response)
# Trigger async validation WITHOUT awaiting it
# This runs in the background and doesn't block
asyncio.create_task(run_validation(response_id, response, data))
For distributed systems where validation happens in a separate service:
import requests
from langgraph.graph import StateGraph
import threading
class ConversationState(TypedDict):
messages: List[Dict[str, str]]
response: str
metadata: Dict[str, Any]
def generate_response(state: ConversationState) -> ConversationState:
"""Generate a response to the user query."""
# Implementation details...
return {
**state,
"response": "Generated response...",
"metadata": {
"response_id": "unique-id-123",
"timestamp": time.time(),
"model": "gpt-4"
}
}
def send_response_with_async_validation(state: ConversationState) -> ConversationState:
"""Send response and trigger async validation via webhook."""
response = state["response"]
metadata = state["metadata"]
# Trigger async validation without waiting
def trigger_validation_webhook():
try:
requests.post(
"https://validation-service.example.com/validate",
json={
"response_id": metadata["response_id"],
"content": response,
"context": {
"messages": state["messages"],
"metadata": metadata
}
},
timeout=1.0 # Short timeout since we don't wait for result
)
except requests.RequestException:
# Just log errors, don't affect main flow
print("Failed to trigger validation webhook")
# Start in separate thread
threading.Thread(target=trigger_validation_webhook).start()
# Continue with response
messages = state["messages"] + [{"role": "assistant", "content": response}]
return {
**state,
"messages": messages
}
# Create graph
workflow = StateGraph(ConversationState)
workflow.add_node("generate", generate_response)
workflow.add_node("respond", send_response_with_async_validation)
workflow.add_edge("generate", "respond")
workflow.set_entry_point("generate")
workflow.set_finish_point("respond")
Having async validation is useful, but how can the results influence the system?
graph TD
A[User Request] --> B[Generate Response]
B --> C[Send Response to User]
C --> D[Async Validation]
D -->|Store Results| E[Validation DB]
%% Feedback loop
F[New User Request] --> G[Retrieve Validation History]
G -->|If previous issues| H[Adjust Prompt]
H --> I[Generate New Response]
I --> J[Send to User]
Implementation:
def retrieve_validation_context(state: ConversationState) -> ConversationState:
"""Retrieve validation history to inform current response."""
conversation_id = state.get("conversation_id")
# Get recent validation results for this conversation
validation_history = get_validation_history(conversation_id, limit=5)
# Extract patterns of issues
issue_patterns = extract_patterns(validation_history)
# Add guidance based on validation history
system_guidance = ""
if issue_patterns.get("factual_errors"):
system_guidance += "Be especially careful about factual accuracy. "
if issue_patterns.get("harmful_content"):
system_guidance += "Ensure response contains no harmful advice. "
return {
**state,
"validation_context": {
"history": validation_history,
"guidance": system_guidance
}
}
# Include this node before response generation
workflow.add_node("validation_context", retrieve_validation_context)
workflow.add_edge("entry", "validation_context")
workflow.add_edge("validation_context", "generate")
For critical applications, implement a real-time correction system:
async def process_with_validation_feedback():
# Initial processing flow
response = generate_initial_response()
# Send to user immediately
await send_to_user(response)
# Run async validation
validation_result = await run_validation_async(response)
# If issues found, send follow-up correction
if not validation_result["passed"]:
correction = generate_correction(validation_result["issues"])
await send_to_user(f"I need to correct my previous response: {correction}")
Validate factual claims after response delivery:
@app.task
def verify_factual_claims(response_id, content):
# Extract factual claims
claims = extract_claims(content)
# Verify each claim against trusted sources
verification_results = []
for claim in claims:
result = verify_claim_against_sources(claim)
verification_results.append(result)
# Log and potentially trigger correction
store_verification_results(response_id, verification_results)
# If significant issues found, trigger correction
if has_significant_errors(verification_results):
queue_correction_message(response_id, verification_results)
Run comprehensive policy checks without delaying response:
@app.task
def deep_content_policy_check(response_id, content):
# Run multiple compliance checks in parallel
results = await asyncio.gather(
check_harmful_content(content),
check_bias(content),
check_toxicity(content),
check_misinformation(content)
)
# Combine results
compliance_result = combine_compliance_results(results)
# Log for analytics and improvement
store_compliance_result(response_id, compliance_result)
# Take appropriate action if needed
if not compliance_result["compliant"]:
handle_compliance_violation(response_id, compliance_result)
Evaluate response quality for feedback loops:
@app.task
def assess_response_quality(response_id, content, prompt):
# Evaluate multiple quality dimensions
relevance_score = evaluate_relevance(content, prompt)
clarity_score = evaluate_clarity(content)
completeness_score = evaluate_completeness(content, prompt)
# Calculate overall quality score
quality = {
"relevance": relevance_score,
"clarity": clarity_score,
"completeness": completeness_score,
"overall": (relevance_score + clarity_score + completeness_score) / 3
}
# Store for analytics and model improvement
store_quality_assessment(response_id, quality)
# Update quality statistics
update_quality_metrics(quality)
Robust error handling is essential for async processes:
def safe_async_validate(func):
"""Decorator to safely handle errors in async validation."""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
# Log error but don't propagate
logger.error(f"Validation error: {str(e)}")
# Store validation failure
await store_validation_error(kwargs.get("response_id"), str(e))
# Return safe default
return {"passed": False, "error": "Validation system error"}
return wrapper
Track validation performance and outcomes:
@app.task
def validate_with_metrics(response_id, content):
# Start timing
start_time = time.time()
# Run validation
result = run_validation(content)
# Record metrics
duration = time.time() - start_time
record_metrics({
"validation_duration": duration,
"validation_success": result["passed"],
"issues_found": len(result.get("issues", [])),
"response_id": response_id
})
return result
For high-throughput systems:
# Configure validation workers independently
validation_app = Celery('validators', broker='redis://localhost:6379/0')
validation_app.conf.task_routes = {
'validators.factual.*': {'queue': 'factual_validation'},
'validators.compliance.*': {'queue': 'compliance_validation'},
'validators.quality.*': {'queue': 'quality_validation'}
}
# Scale different validation types independently
validation_app.conf.worker_concurrency = {
'factual_validation': 5, # Resource-intensive
'compliance_validation': 10, # Medium load
'quality_validation': 3 # Low priority
}
Complete example using FastAPI's background tasks feature:
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Any
from langgraph.graph import StateGraph
app = FastAPI()
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[Message]
class ValidationResult(BaseModel):
passed: bool
issues: List[Dict[str, Any]] = []
class ChatResponse(BaseModel):
message: Message
response_id: str
# Background validation function
def validate_response(response_id: str, content: str, context: List[Message]):
# Run validation logic
validation_result = ValidationResult(passed=True)
try:
# Perform various validation checks
factual_check = check_factual_accuracy(content)
policy_check = check_policy_compliance(content)
quality_check = check_response_quality(content, context)
# Combine results
validation_result.passed = all([
factual_check["passed"],
policy_check["passed"],
quality_check["passed"]
])
# Collect issues
validation_result.issues = (
factual_check.get("issues", []) +
policy_check.get("issues", []) +
quality_check.get("issues", [])
)
except Exception as e:
# Log error but don't affect response
print(f"Validation error: {e}")
validation_result.passed = False
validation_result.issues = [{"type": "system_error", "description": str(e)}]
# Store result
store_validation_result(response_id, validation_result.dict())
# Take action on failed validation if needed
if not validation_result.passed:
handle_validation_failure(response_id, validation_result.dict())
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest, background_tasks: BackgroundTasks):
# Process with LangGraph
workflow = get_langgraph_workflow()
result = workflow.invoke({"messages": [m.dict() for m in request.messages]})
# Extract response and ID
response_content = result["response"]
response_id = result["metadata"]["response_id"]
# Schedule validation to run in the background
# This doesn't block the response
background_tasks.add_task(
validate_response,
response_id=response_id,
content=response_content,
context=[m.dict() for m in request.messages]
)
# Return response immediately
return ChatResponse(
message=Message(role="assistant", content=response_content),
response_id=response_id
)
- Improved User Experience: Faster response times by not blocking on validation
- More Thorough Validation: Can run complex, time-consuming validation without user delay
- Resource Efficiency: Can schedule validation during off-peak times or distribute load
- Continuous Improvement: Builds a dataset of validation results for model improvement
- System Resilience: Validation failures don't impact core response functionality
- Delayed Feedback: Issues aren't caught before sending to the user
- Correction Complexity: If validation fails, corrections must be handled separately
- System Complexity: More moving parts to maintain and monitor
- Eventual Consistency: System may temporarily have inconsistent state
- Data Management: Requires management of validation results and feedback loops
✅ Good Use Cases:
- Non-critical content where speed is more important than perfection
- Systems where validation is resource-intensive
- Applications with built-in correction mechanisms
- Collecting quality metrics for model improvement
- High-throughput systems where validation would create bottlenecks
❌ Poor Use Cases:
- Safety-critical applications
- Systems where incorrect information could cause harm
- Applications where corrections would be more confusing than helpful
- Simple validations that can run quickly
- Contexts where user trust depends on immediate accuracy
Asynchronous validation in LangGraph provides a powerful pattern for balancing responsiveness with thoroughness. By separating validation from the main request-response cycle, systems can deliver quick responses while still ensuring quality and compliance through background processing. The implementation approach should be selected based on system architecture, scale requirements, and the criticality of immediate validation.
For most production systems, a combination of:
- Fast, basic validation during response generation
- Comprehensive async validation post-delivery
- Structured feedback loops to incorporate validation results
This creates a robust framework that balances performance with quality while continuously improving over time.