Skip to content

Instantly share code, notes, and snippets.

@williamcaban
Last active July 26, 2025 17:07
Show Gist options
  • Save williamcaban/ac7dbff82d79fb4656499c65c948ebb3 to your computer and use it in GitHub Desktop.
Save williamcaban/ac7dbff82d79fb4656499c65c948ebb3 to your computer and use it in GitHub Desktop.
RAGAS evaluation using Kubeflow Pipelines

RAGAS Evaluation with Kubeflow Pipelines - Setup Guide

About

The ragas_pipeline.py is a Kubeflow Pipeline (KFP) definition on how to run RAGAS evaluations using KFP.

This pipeline is designed as an example pipeline with charactersitics expected for production environments like proper resource management, monitoring capabilities, and comprehensive documentation. You should adjust the components based on your specific RAG evaluation needs and infrastructure setup.

Key Components:

  1. Modular Pipeline Structure: Three main components for data preparation, evaluation, and reporting
  2. RAGAS Integration: Uses core RAGAS metrics like faithfulness, answer relevancy, context precision, and context recall
  3. Scalable Architecture: Designed to handle large datasets with configurable resource limits
  4. Comprehensive Reporting: Generates metrics, detailed results, and visualizations

Main Features:

  • Data Preparation: Handles various input formats (CSV, JSON) and validates required columns
  • Evaluation Engine: Runs RAGAS metrics using OpenAI or other LLM providers
  • Results Tracking: Logs metrics to Kubeflow and generates detailed reports
  • Error Handling: Includes proper error handling and resource management

Getting Started:

  1. Install dependencies: pip install kfp ragas datasets pandas openai langchain
  2. Prepare your dataset with required columns (question, answer, contexts, ground_truth)
  3. Compile the pipeline: Run the Python script to generate the YAML file
  4. Upload to Kubeflow: Use the UI or Python client to run the pipeline

Detailed Instructions

Prerequisites

  1. Kubeflow Pipelines cluster running and accessible
  2. Python 3.11 or higher environment with required packages
  3. OpenAI API key (or other LLM provider credentials)
  4. Evaluation dataset in the required format

Installation

# Ensure you're using Python 3.11 or higher
python --version  # Should show 3.11.x or higher

pip install kfp ragas datasets pandas openai langchain

Dataset Format

Your evaluation dataset should contain the following columns:

{
  "question": "What is the capital of France?",
  "answer": "The capital of France is Paris.",
  "contexts": ["Paris is the capital and most populous city of France..."],
  "ground_truth": "Paris"
}

Required Columns:

  • question: The input question/query
  • answer: The generated answer from your RAG system
  • contexts: List of retrieved context chunks
  • ground_truth: The expected/correct answer (optional for some metrics)

Pipeline Configuration

1. Environment Variables

Create a Kubernetes secret for sensitive data:

apiVersion: v1
kind: Secret
metadata:
  name: ragas-secrets
  namespace: kubeflow
type: Opaque
stringData:
  openai-api-key: "your-openai-api-key-here"

2. Storage Configuration

Ensure your Kubeflow cluster has access to your data storage:

# For Google Cloud Storage
apiVersion: v1
kind: Secret
metadata:
  name: gcs-credentials
type: Opaque
data:
  key.json: <base64-encoded-service-account-key>

Running the Pipeline

Method 1: Using Kubeflow Pipelines UI

  1. Compile the pipeline:

    python ragas_pipeline.py
  2. Upload ragas_evaluation_pipeline.yaml to Kubeflow Pipelines UI

  3. Create a new run with parameters:

    • dataset_path: Path to your evaluation dataset
    • openai_api_key: Your OpenAI API key

Method 2: Using Python Client

import kfp

# Connect to Kubeflow Pipelines
client = kfp.Client(host="https://your-kubeflow-host")

# Create experiment
experiment = client.create_experiment("ragas-evaluation")

# Run pipeline
run = client.run_pipeline(
    experiment_id=experiment.id,
    job_name="ragas-evaluation-run",
    pipeline_package_path="ragas_evaluation_pipeline.yaml",
    params={
        "dataset_path": "gs://your-bucket/evaluation_data.csv",
        "openai_api_key": "your-openai-api-key"
    }
)

Supported RAGAS Metrics

The pipeline supports the following RAGAS metrics:

  • Faithfulness: Measures factual consistency of answers with contexts
  • Answer Relevancy: Evaluates how relevant answers are to questions
  • Context Precision: Measures precision of retrieved contexts
  • Context Recall: Measures recall of retrieved contexts
  • Context Relevancy: Evaluates relevance of retrieved contexts
  • Answer Correctness: Compares answers with ground truth
  • Answer Similarity: Measures semantic similarity with ground truth

Customization Options

1. Adding Custom Metrics

from ragas.metrics import BaseMetric

class CustomMetric(BaseMetric):
    def _compute_score(self, prediction, reference):
        # Your custom scoring logic
        return score

# Add to metrics list in the evaluation component

2. Using Different LLM Providers

# For Azure OpenAI
import openai
openai.api_type = "azure"
openai.api_base = "https://your-resource-name.openai.azure.com/"
openai.api_version = "2023-05-15"

# For local models
from langchain.llms import Ollama
llm = Ollama(model="llama2")

3. Batch Processing

For large datasets, modify the evaluation component to process in batches:

def process_in_batches(dataset, batch_size=100):
    for i in range(0, len(dataset), batch_size):
        batch = dataset[i:i+batch_size]
        yield evaluate(batch, metrics=metrics)

Monitoring and Debugging

1. View Pipeline Logs

kubectl logs -n kubeflow -f <pod-name>

2. Monitor Resource Usage

The pipeline includes resource limits. Adjust based on your dataset size:

evaluation_task.set_cpu_limit("4")
evaluation_task.set_memory_limit("16Gi")
evaluation_task.set_gpu_limit("1")  # If using GPU-accelerated models

3. Error Handling

Common issues and solutions:

  • API Rate Limits: Add retry logic and delays
  • Memory Issues: Process data in smaller batches
  • Authentication: Ensure proper secret mounting
  • Data Format: Validate dataset schema before evaluation

Output Artifacts

The pipeline generates:

  1. Metrics: Logged to Kubeflow Metrics
  2. Detailed Results: CSV file with per-sample scores
  3. Summary Report: JSON report with aggregate metrics
  4. Visualization: Charts showing metric distributions

Advanced Features

1. Comparative Evaluation

Run multiple models and compare results:

@pipeline
def comparative_ragas_pipeline(
    model_a_dataset: str,
    model_b_dataset: str
):
    eval_a = run_ragas_evaluation(model_a_dataset)
    eval_b = run_ragas_evaluation(model_b_dataset)
    compare_results(eval_a.outputs, eval_b.outputs)

2. Automated Thresholds

Set up automated quality gates:

def check_quality_gates(metrics):
    thresholds = {
        "faithfulness": 0.8,
        "answer_relevancy": 0.7,
        "context_precision": 0.75
    }
    
    for metric, threshold in thresholds.items():
        if metrics[metric] < threshold:
            raise ValueError(f"{metric} below threshold: {metrics[metric]} < {threshold}")

3. Integration with MLflow

Track experiments with MLflow:

import mlflow

with mlflow.start_run():
    mlflow.log_metrics(evaluation_results)
    mlflow.log_artifact("evaluation_report.json")

Best Practices

  1. Version Control: Version your evaluation datasets
  2. Reproducibility: Use fixed random seeds and model versions
  3. Cost Management: Monitor API usage and costs
  4. Data Privacy: Ensure sensitive data is properly handled
  5. Scalability: Design for your expected dataset sizes
  6. Monitoring: Set up alerts for pipeline failures

Troubleshooting

Common Issues:

  1. ImportError: Ensure all required packages are in base image
  2. Authentication: Check API keys and cluster permissions
  3. Resource Limits: Adjust CPU/memory based on dataset size
  4. Data Access: Verify storage permissions and paths

Debug Commands:

# Check pod status
kubectl get pods -n kubeflow

# View pipeline logs
kubectl logs -n kubeflow <pipeline-pod-name>

# Check resource usage
kubectl top pods -n kubeflow
import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Input, Output, Dataset, Metrics
from typing import NamedTuple
# Component for data preparation
@component(
base_image="python:3.11-slim",
packages_to_install=["ragas", "datasets", "pandas", "openai", "langchain"]
)
def prepare_evaluation_data(
dataset_path: str,
output_dataset: Output[Dataset]
) -> NamedTuple("Outputs", [("num_samples", int)]):
"""Prepare evaluation dataset for RAGAS evaluation."""
import pandas as pd
import json
from collections import namedtuple
# Load your evaluation dataset
# This could be from various sources: CSV, JSON, HuggingFace datasets, etc.
if dataset_path.endswith('.csv'):
df = pd.read_csv(dataset_path)
elif dataset_path.endswith('.json'):
df = pd.read_json(dataset_path)
else:
raise ValueError("Unsupported file format")
# Ensure required columns exist for RAGAS evaluation
required_columns = ['question', 'answer', 'contexts', 'ground_truth']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Warning: Missing columns {missing_columns}")
# Handle missing columns based on your use case
# Save prepared dataset
df.to_json(output_dataset.path, orient='records', lines=True)
outputs = namedtuple("Outputs", ["num_samples"])
return outputs(len(df))
# Component for running RAGAS evaluation
@component(
base_image="python:3.11-slim",
packages_to_install=["ragas", "datasets", "pandas", "openai", "langchain", "sentence-transformers"]
)
def run_ragas_evaluation(
input_dataset: Input[Dataset],
openai_api_key: str,
evaluation_metrics: Output[Metrics]
) -> NamedTuple("Outputs", [("faithfulness_score", float), ("answer_relevancy_score", float), ("context_precision_score", float), ("context_recall_score", float)]):
"""Run RAGAS evaluation with specified metrics."""
import pandas as pd
import json
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
context_relevancy,
answer_correctness,
answer_similarity
)
import os
from collections import namedtuple
# Set OpenAI API key
os.environ["OPENAI_API_KEY"] = openai_api_key
# Load dataset
df = pd.read_json(input_dataset.path, lines=True)
# Convert to HuggingFace Dataset format
dataset = Dataset.from_pandas(df)
# Define metrics to evaluate
metrics = [
faithfulness,
answer_relevancy,
context_precision,
context_recall,
# context_relevancy, # Add more metrics as needed
# answer_correctness,
# answer_similarity
]
# Run evaluation
print("Starting RAGAS evaluation...")
result = evaluate(
dataset=dataset,
metrics=metrics,
)
# Extract scores
faithfulness_score = result['faithfulness']
answer_relevancy_score = result['answer_relevancy']
context_precision_score = result['context_precision']
context_recall_score = result['context_recall']
# Log metrics to Kubeflow
evaluation_metrics.log_metric("faithfulness", faithfulness_score)
evaluation_metrics.log_metric("answer_relevancy", answer_relevancy_score)
evaluation_metrics.log_metric("context_precision", context_precision_score)
evaluation_metrics.log_metric("context_recall", context_recall_score)
# Save detailed results
result_df = result.to_pandas()
result_df.to_csv("/tmp/detailed_results.csv", index=False)
print(f"Evaluation completed!")
print(f"Faithfulness: {faithfulness_score:.4f}")
print(f"Answer Relevancy: {answer_relevancy_score:.4f}")
print(f"Context Precision: {context_precision_score:.4f}")
print(f"Context Recall: {context_recall_score:.4f}")
outputs = namedtuple("Outputs", [
"faithfulness_score",
"answer_relevancy_score",
"context_precision_score",
"context_recall_score"
])
return outputs(
faithfulness_score,
answer_relevancy_score,
context_precision_score,
context_recall_score
)
# Component for generating evaluation report
@component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "matplotlib", "seaborn"]
)
def generate_evaluation_report(
faithfulness_score: float,
answer_relevancy_score: float,
context_precision_score: float,
context_recall_score: float,
num_samples: int,
report_output: Output[Dataset]
):
"""Generate a comprehensive evaluation report."""
import pandas as pd
import json
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
# Create summary report
report_data = {
"evaluation_timestamp": datetime.now().isoformat(),
"total_samples": num_samples,
"metrics": {
"faithfulness": faithfulness_score,
"answer_relevancy": answer_relevancy_score,
"context_precision": context_precision_score,
"context_recall": context_recall_score
},
"summary": {
"average_score": (faithfulness_score + answer_relevancy_score +
context_precision_score + context_recall_score) / 4,
"evaluation_status": "completed"
}
}
# Save report
with open(report_output.path, 'w') as f:
json.dump(report_data, f, indent=2)
# Create visualization (optional)
metrics_df = pd.DataFrame([
{"Metric": "Faithfulness", "Score": faithfulness_score},
{"Metric": "Answer Relevancy", "Score": answer_relevancy_score},
{"Metric": "Context Precision", "Score": context_precision_score},
{"Metric": "Context Recall", "Score": context_recall_score}
])
plt.figure(figsize=(10, 6))
sns.barplot(data=metrics_df, x="Metric", y="Score")
plt.title("RAGAS Evaluation Results")
plt.ylim(0, 1)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("/tmp/evaluation_chart.png")
print("Evaluation report generated successfully!")
# Define the pipeline
@pipeline(
name="ragas-evaluation-pipeline",
description="A pipeline to evaluate RAG applications using RAGAS metrics"
)
def ragas_evaluation_pipeline(
dataset_path: str = "gs://your-bucket/evaluation_data.csv",
openai_api_key: str = "your-openai-api-key"
):
"""Main RAGAS evaluation pipeline."""
# Step 1: Prepare evaluation data
data_prep_task = prepare_evaluation_data(
dataset_path=dataset_path
)
# Step 2: Run RAGAS evaluation
evaluation_task = run_ragas_evaluation(
input_dataset=data_prep_task.outputs["output_dataset"],
openai_api_key=openai_api_key
)
# Step 3: Generate report
report_task = generate_evaluation_report(
faithfulness_score=evaluation_task.outputs["faithfulness_score"],
answer_relevancy_score=evaluation_task.outputs["answer_relevancy_score"],
context_precision_score=evaluation_task.outputs["context_precision_score"],
context_recall_score=evaluation_task.outputs["context_recall_score"],
num_samples=data_prep_task.outputs["num_samples"]
)
# Set resource requirements (optional)
evaluation_task.set_cpu_limit("2")
evaluation_task.set_memory_limit("8Gi")
# Compile and run the pipeline
if __name__ == "__main__":
# Compile the pipeline
kfp.compiler.Compiler().compile(
pipeline_func=ragas_evaluation_pipeline,
package_path="ragas_evaluation_pipeline.yaml"
)
# Example of how to run the pipeline
# client = kfp.Client(host="your-kubeflow-host")
#
# run = client.run_pipeline(
# experiment_id="your-experiment-id",
# job_name="ragas-evaluation-run",
# pipeline_package_path="ragas_evaluation_pipeline.yaml",
# params={
# "dataset_path": "gs://your-bucket/evaluation_data.csv",
# "openai_api_key": "your-openai-api-key"
# }
# )
print("Pipeline compiled successfully! Upload 'ragas_evaluation_pipeline.yaml' to Kubeflow Pipelines UI.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment