Skip to content

Instantly share code, notes, and snippets.

@decagondev
Created May 1, 2025 15:57
Show Gist options
  • Save decagondev/eeac13786a1653153c87528dac96713c to your computer and use it in GitHub Desktop.
Save decagondev/eeac13786a1653153c87528dac96713c to your computer and use it in GitHub Desktop.

Asynchronous Validation in LangGraph Workflows

Conceptual Overview

Asynchronous validation in LangGraph refers to validation processes that run independently of the main request-response cycle. Instead of blocking the response while validation completes, the system:

  1. Returns results to the user immediately
  2. Runs validation in the background
  3. Handles validation outcomes separately (logging, alerting, or feeding back into the system)
sequenceDiagram
    participant User
    participant API
    participant LangGraph
    participant AsyncValidator
    participant DB

    User->>API: Request
    API->>LangGraph: Process request
    LangGraph->>API: Generate response
    API->>User: Return response
    API->>AsyncValidator: Trigger validation (non-blocking)
    Note over AsyncValidator: Validation runs independently
    AsyncValidator->>DB: Log validation results
    AsyncValidator-->>LangGraph: (Optional) Feed results to future requests
Loading

Implementation Strategies

1. Task Queue Approach

Using a task queue system (like Celery, RQ, or AWS SQS) to handle asynchronous validation:

from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph
import asyncio
from celery import Celery

# Setup Celery
app = Celery('async_validator', broker='redis://localhost:6379/0')

# Define validation task
@app.task
def validate_response(response_id: str, generated_content: str, prompt: str):
    """
    Asynchronously validate a response after it's been sent to the user.
    This runs independently and doesn't block the main flow.
    """
    # Run validation logic
    validation_result = run_validation_checks(generated_content, prompt)
    
    # Log results to database
    store_validation_result(response_id, validation_result)
    
    # Optionally trigger alerts for failed validations
    if not validation_result["passed"]:
        send_alert(response_id, validation_result["issues"])
    
    return validation_result

# LangGraph state definition
class ConversationState(TypedDict):
    messages: List[Dict[str, str]]
    current_response: str
    response_id: str

def process_request(state: ConversationState) -> ConversationState:
    """Process the user request and generate a response."""
    # Implementation of your LLM response generation
    # ...
    
    # Generate unique ID for this response
    import uuid
    response_id = str(uuid.uuid4())
    
    return {
        **state,
        "current_response": "Generated response content...",
        "response_id": response_id
    }

def send_response_and_validate(state: ConversationState) -> ConversationState:
    """Send response to user and trigger async validation."""
    response = state["current_response"]
    response_id = state["response_id"]
    
    # Extract prompt from messages for validation context
    last_user_message = next(
        (msg["content"] for msg in reversed(state["messages"]) if msg["role"] == "user"),
        ""
    )
    
    # Trigger async validation task
    # This doesn't block or wait for completion
    validate_response.delay(response_id, response, last_user_message)
    
    # Update conversation with assistant's response
    messages = state["messages"] + [{"role": "assistant", "content": response}]
    
    return {
        **state,
        "messages": messages
    }

# Create the graph
workflow = StateGraph(ConversationState)
workflow.add_node("process_request", process_request)
workflow.add_node("send_response", send_response_and_validate)
workflow.add_edge("process_request", "send_response")
workflow.set_entry_point("process_request")
workflow.set_finish_point("send_response")

2. Concurrent Tasks with asyncio

Using Python's asyncio for concurrent operations in FastAPI:

from fastapi import FastAPI, WebSocket
from langchain_core.runnables import RunnablePassthrough
import asyncio
from langgraph.graph import StateGraph

app = FastAPI()

async def run_validation(response_id: str, content: str, prompt: str):
    """Async function to run validation checks."""
    # This runs independently and doesn't block the response
    try:
        # Complex validation logic here
        validation_result = {"passed": True, "issues": []}
        
        # Simulate validation work
        await asyncio.sleep(2)  
        
        # Log results
        await store_validation_result_async(response_id, validation_result)
    except Exception as e:
        # Log error but don't affect main flow
        print(f"Validation error: {e}")

@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    # Setup LangGraph workflow
    workflow = get_langgraph_workflow()
    
    while True:
        # Receive message
        data = await websocket.receive_text()
        
        # Process with LangGraph
        state = {"messages": [{"role": "user", "content": data}]}
        result = workflow.invoke(state)
        
        # Extract response to send back
        response = result["messages"][-1]["content"]
        response_id = result["response_id"]
        
        # Send response immediately
        await websocket.send_text(response)
        
        # Trigger async validation WITHOUT awaiting it
        # This runs in the background and doesn't block
        asyncio.create_task(run_validation(response_id, response, data))

3. Webhook-Based Solution

For distributed systems where validation happens in a separate service:

import requests
from langgraph.graph import StateGraph
import threading

class ConversationState(TypedDict):
    messages: List[Dict[str, str]]
    response: str
    metadata: Dict[str, Any]

def generate_response(state: ConversationState) -> ConversationState:
    """Generate a response to the user query."""
    # Implementation details...
    return {
        **state,
        "response": "Generated response...",
        "metadata": {
            "response_id": "unique-id-123",
            "timestamp": time.time(),
            "model": "gpt-4"
        }
    }

def send_response_with_async_validation(state: ConversationState) -> ConversationState:
    """Send response and trigger async validation via webhook."""
    response = state["response"]
    metadata = state["metadata"]
    
    # Trigger async validation without waiting
    def trigger_validation_webhook():
        try:
            requests.post(
                "https://validation-service.example.com/validate",
                json={
                    "response_id": metadata["response_id"],
                    "content": response,
                    "context": {
                        "messages": state["messages"],
                        "metadata": metadata
                    }
                },
                timeout=1.0  # Short timeout since we don't wait for result
            )
        except requests.RequestException:
            # Just log errors, don't affect main flow
            print("Failed to trigger validation webhook")
    
    # Start in separate thread
    threading.Thread(target=trigger_validation_webhook).start()
    
    # Continue with response
    messages = state["messages"] + [{"role": "assistant", "content": response}]
    return {
        **state,
        "messages": messages
    }

# Create graph
workflow = StateGraph(ConversationState)
workflow.add_node("generate", generate_response)
workflow.add_node("respond", send_response_with_async_validation)
workflow.add_edge("generate", "respond")
workflow.set_entry_point("generate")
workflow.set_finish_point("respond")

Feedback Mechanisms

Having async validation is useful, but how can the results influence the system?

1. Validator → Model Feedback Loop

graph TD
    A[User Request] --> B[Generate Response]
    B --> C[Send Response to User]
    C --> D[Async Validation]
    D -->|Store Results| E[Validation DB]
    
    %% Feedback loop
    F[New User Request] --> G[Retrieve Validation History]
    G -->|If previous issues| H[Adjust Prompt]
    H --> I[Generate New Response]
    I --> J[Send to User]
Loading

Implementation:

def retrieve_validation_context(state: ConversationState) -> ConversationState:
    """Retrieve validation history to inform current response."""
    conversation_id = state.get("conversation_id")
    
    # Get recent validation results for this conversation
    validation_history = get_validation_history(conversation_id, limit=5)
    
    # Extract patterns of issues
    issue_patterns = extract_patterns(validation_history)
    
    # Add guidance based on validation history
    system_guidance = ""
    if issue_patterns.get("factual_errors"):
        system_guidance += "Be especially careful about factual accuracy. "
    if issue_patterns.get("harmful_content"):
        system_guidance += "Ensure response contains no harmful advice. "
    
    return {
        **state,
        "validation_context": {
            "history": validation_history,
            "guidance": system_guidance
        }
    }

# Include this node before response generation
workflow.add_node("validation_context", retrieve_validation_context)
workflow.add_edge("entry", "validation_context")
workflow.add_edge("validation_context", "generate")

2. Real-time Correction System

For critical applications, implement a real-time correction system:

async def process_with_validation_feedback():
    # Initial processing flow
    response = generate_initial_response()
    
    # Send to user immediately
    await send_to_user(response)
    
    # Run async validation
    validation_result = await run_validation_async(response)
    
    # If issues found, send follow-up correction
    if not validation_result["passed"]:
        correction = generate_correction(validation_result["issues"])
        await send_to_user(f"I need to correct my previous response: {correction}")

Practical Use Cases

1. Factual Verification

Validate factual claims after response delivery:

@app.task
def verify_factual_claims(response_id, content):
    # Extract factual claims
    claims = extract_claims(content)
    
    # Verify each claim against trusted sources
    verification_results = []
    for claim in claims:
        result = verify_claim_against_sources(claim)
        verification_results.append(result)
    
    # Log and potentially trigger correction
    store_verification_results(response_id, verification_results)
    
    # If significant issues found, trigger correction
    if has_significant_errors(verification_results):
        queue_correction_message(response_id, verification_results)

2. Content Policy Compliance

Run comprehensive policy checks without delaying response:

@app.task
def deep_content_policy_check(response_id, content):
    # Run multiple compliance checks in parallel
    results = await asyncio.gather(
        check_harmful_content(content),
        check_bias(content),
        check_toxicity(content),
        check_misinformation(content)
    )
    
    # Combine results
    compliance_result = combine_compliance_results(results)
    
    # Log for analytics and improvement
    store_compliance_result(response_id, compliance_result)
    
    # Take appropriate action if needed
    if not compliance_result["compliant"]:
        handle_compliance_violation(response_id, compliance_result)

3. Quality Assessment

Evaluate response quality for feedback loops:

@app.task
def assess_response_quality(response_id, content, prompt):
    # Evaluate multiple quality dimensions
    relevance_score = evaluate_relevance(content, prompt)
    clarity_score = evaluate_clarity(content)
    completeness_score = evaluate_completeness(content, prompt)
    
    # Calculate overall quality score
    quality = {
        "relevance": relevance_score,
        "clarity": clarity_score,
        "completeness": completeness_score,
        "overall": (relevance_score + clarity_score + completeness_score) / 3
    }
    
    # Store for analytics and model improvement
    store_quality_assessment(response_id, quality)
    
    # Update quality statistics
    update_quality_metrics(quality)

Technical Considerations

1. Error Handling

Robust error handling is essential for async processes:

def safe_async_validate(func):
    """Decorator to safely handle errors in async validation."""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            # Log error but don't propagate
            logger.error(f"Validation error: {str(e)}")
            # Store validation failure
            await store_validation_error(kwargs.get("response_id"), str(e))
            # Return safe default
            return {"passed": False, "error": "Validation system error"}
    return wrapper

2. Monitoring and Observability

Track validation performance and outcomes:

@app.task
def validate_with_metrics(response_id, content):
    # Start timing
    start_time = time.time()
    
    # Run validation
    result = run_validation(content)
    
    # Record metrics
    duration = time.time() - start_time
    record_metrics({
        "validation_duration": duration,
        "validation_success": result["passed"],
        "issues_found": len(result.get("issues", [])),
        "response_id": response_id
    })
    
    return result

3. Scaling Considerations

For high-throughput systems:

# Configure validation workers independently
validation_app = Celery('validators', broker='redis://localhost:6379/0')
validation_app.conf.task_routes = {
    'validators.factual.*': {'queue': 'factual_validation'},
    'validators.compliance.*': {'queue': 'compliance_validation'},
    'validators.quality.*': {'queue': 'quality_validation'}
}

# Scale different validation types independently
validation_app.conf.worker_concurrency = {
    'factual_validation': 5,  # Resource-intensive
    'compliance_validation': 10,  # Medium load
    'quality_validation': 3   # Low priority
}

Implementation Example: FastAPI with Background Tasks

Complete example using FastAPI's background tasks feature:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Any
from langgraph.graph import StateGraph

app = FastAPI()

class Message(BaseModel):
    role: str
    content: str

class ChatRequest(BaseModel):
    messages: List[Message]

class ValidationResult(BaseModel):
    passed: bool
    issues: List[Dict[str, Any]] = []

class ChatResponse(BaseModel):
    message: Message
    response_id: str

# Background validation function
def validate_response(response_id: str, content: str, context: List[Message]):
    # Run validation logic
    validation_result = ValidationResult(passed=True)
    
    try:
        # Perform various validation checks
        factual_check = check_factual_accuracy(content)
        policy_check = check_policy_compliance(content)
        quality_check = check_response_quality(content, context)
        
        # Combine results
        validation_result.passed = all([
            factual_check["passed"], 
            policy_check["passed"],
            quality_check["passed"]
        ])
        
        # Collect issues
        validation_result.issues = (
            factual_check.get("issues", []) + 
            policy_check.get("issues", []) + 
            quality_check.get("issues", [])
        )
    except Exception as e:
        # Log error but don't affect response
        print(f"Validation error: {e}")
        validation_result.passed = False
        validation_result.issues = [{"type": "system_error", "description": str(e)}]
    
    # Store result
    store_validation_result(response_id, validation_result.dict())
    
    # Take action on failed validation if needed
    if not validation_result.passed:
        handle_validation_failure(response_id, validation_result.dict())

@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest, background_tasks: BackgroundTasks):
    # Process with LangGraph
    workflow = get_langgraph_workflow()
    result = workflow.invoke({"messages": [m.dict() for m in request.messages]})
    
    # Extract response and ID
    response_content = result["response"]
    response_id = result["metadata"]["response_id"]
    
    # Schedule validation to run in the background
    # This doesn't block the response
    background_tasks.add_task(
        validate_response,
        response_id=response_id,
        content=response_content,
        context=[m.dict() for m in request.messages]
    )
    
    # Return response immediately
    return ChatResponse(
        message=Message(role="assistant", content=response_content),
        response_id=response_id
    )

Pros and Cons

Advantages

  1. Improved User Experience: Faster response times by not blocking on validation
  2. More Thorough Validation: Can run complex, time-consuming validation without user delay
  3. Resource Efficiency: Can schedule validation during off-peak times or distribute load
  4. Continuous Improvement: Builds a dataset of validation results for model improvement
  5. System Resilience: Validation failures don't impact core response functionality

Disadvantages

  1. Delayed Feedback: Issues aren't caught before sending to the user
  2. Correction Complexity: If validation fails, corrections must be handled separately
  3. System Complexity: More moving parts to maintain and monitor
  4. Eventual Consistency: System may temporarily have inconsistent state
  5. Data Management: Requires management of validation results and feedback loops

When to Use Async Validation

Good Use Cases:

  • Non-critical content where speed is more important than perfection
  • Systems where validation is resource-intensive
  • Applications with built-in correction mechanisms
  • Collecting quality metrics for model improvement
  • High-throughput systems where validation would create bottlenecks

Poor Use Cases:

  • Safety-critical applications
  • Systems where incorrect information could cause harm
  • Applications where corrections would be more confusing than helpful
  • Simple validations that can run quickly
  • Contexts where user trust depends on immediate accuracy

Conclusion

Asynchronous validation in LangGraph provides a powerful pattern for balancing responsiveness with thoroughness. By separating validation from the main request-response cycle, systems can deliver quick responses while still ensuring quality and compliance through background processing. The implementation approach should be selected based on system architecture, scale requirements, and the criticality of immediate validation.

For most production systems, a combination of:

  1. Fast, basic validation during response generation
  2. Comprehensive async validation post-delivery
  3. Structured feedback loops to incorporate validation results

This creates a robust framework that balances performance with quality while continuously improving over time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment