The ragas_pipeline.py
is a Kubeflow Pipeline (KFP) definition on how to run RAGAS evaluations using KFP.
This pipeline is designed as an example pipeline with charactersitics expected for production environments like proper resource management, monitoring capabilities, and comprehensive documentation. You should adjust the components based on your specific RAG evaluation needs and infrastructure setup.
Key Components:
- Modular Pipeline Structure: Three main components for data preparation, evaluation, and reporting
- RAGAS Integration: Uses core RAGAS metrics like faithfulness, answer relevancy, context precision, and context recall
- Scalable Architecture: Designed to handle large datasets with configurable resource limits
- Comprehensive Reporting: Generates metrics, detailed results, and visualizations
Main Features:
- Data Preparation: Handles various input formats (CSV, JSON) and validates required columns
- Evaluation Engine: Runs RAGAS metrics using OpenAI or other LLM providers
- Results Tracking: Logs metrics to Kubeflow and generates detailed reports
- Error Handling: Includes proper error handling and resource management
Getting Started:
- Install dependencies:
pip install kfp ragas datasets pandas openai langchain
- Prepare your dataset with required columns (question, answer, contexts, ground_truth)
- Compile the pipeline: Run the Python script to generate the YAML file
- Upload to Kubeflow: Use the UI or Python client to run the pipeline
- Kubeflow Pipelines cluster running and accessible
- Python 3.11 or higher environment with required packages
- OpenAI API key (or other LLM provider credentials)
- Evaluation dataset in the required format
# Ensure you're using Python 3.11 or higher
python --version # Should show 3.11.x or higher
pip install kfp ragas datasets pandas openai langchain
Your evaluation dataset should contain the following columns:
{
"question": "What is the capital of France?",
"answer": "The capital of France is Paris.",
"contexts": ["Paris is the capital and most populous city of France..."],
"ground_truth": "Paris"
}
- question: The input question/query
- answer: The generated answer from your RAG system
- contexts: List of retrieved context chunks
- ground_truth: The expected/correct answer (optional for some metrics)
Create a Kubernetes secret for sensitive data:
apiVersion: v1
kind: Secret
metadata:
name: ragas-secrets
namespace: kubeflow
type: Opaque
stringData:
openai-api-key: "your-openai-api-key-here"
Ensure your Kubeflow cluster has access to your data storage:
# For Google Cloud Storage
apiVersion: v1
kind: Secret
metadata:
name: gcs-credentials
type: Opaque
data:
key.json: <base64-encoded-service-account-key>
-
Compile the pipeline:
python ragas_pipeline.py
-
Upload
ragas_evaluation_pipeline.yaml
to Kubeflow Pipelines UI -
Create a new run with parameters:
dataset_path
: Path to your evaluation datasetopenai_api_key
: Your OpenAI API key
import kfp
# Connect to Kubeflow Pipelines
client = kfp.Client(host="https://your-kubeflow-host")
# Create experiment
experiment = client.create_experiment("ragas-evaluation")
# Run pipeline
run = client.run_pipeline(
experiment_id=experiment.id,
job_name="ragas-evaluation-run",
pipeline_package_path="ragas_evaluation_pipeline.yaml",
params={
"dataset_path": "gs://your-bucket/evaluation_data.csv",
"openai_api_key": "your-openai-api-key"
}
)
The pipeline supports the following RAGAS metrics:
- Faithfulness: Measures factual consistency of answers with contexts
- Answer Relevancy: Evaluates how relevant answers are to questions
- Context Precision: Measures precision of retrieved contexts
- Context Recall: Measures recall of retrieved contexts
- Context Relevancy: Evaluates relevance of retrieved contexts
- Answer Correctness: Compares answers with ground truth
- Answer Similarity: Measures semantic similarity with ground truth
from ragas.metrics import BaseMetric
class CustomMetric(BaseMetric):
def _compute_score(self, prediction, reference):
# Your custom scoring logic
return score
# Add to metrics list in the evaluation component
# For Azure OpenAI
import openai
openai.api_type = "azure"
openai.api_base = "https://your-resource-name.openai.azure.com/"
openai.api_version = "2023-05-15"
# For local models
from langchain.llms import Ollama
llm = Ollama(model="llama2")
For large datasets, modify the evaluation component to process in batches:
def process_in_batches(dataset, batch_size=100):
for i in range(0, len(dataset), batch_size):
batch = dataset[i:i+batch_size]
yield evaluate(batch, metrics=metrics)
kubectl logs -n kubeflow -f <pod-name>
The pipeline includes resource limits. Adjust based on your dataset size:
evaluation_task.set_cpu_limit("4")
evaluation_task.set_memory_limit("16Gi")
evaluation_task.set_gpu_limit("1") # If using GPU-accelerated models
Common issues and solutions:
- API Rate Limits: Add retry logic and delays
- Memory Issues: Process data in smaller batches
- Authentication: Ensure proper secret mounting
- Data Format: Validate dataset schema before evaluation
The pipeline generates:
- Metrics: Logged to Kubeflow Metrics
- Detailed Results: CSV file with per-sample scores
- Summary Report: JSON report with aggregate metrics
- Visualization: Charts showing metric distributions
Run multiple models and compare results:
@pipeline
def comparative_ragas_pipeline(
model_a_dataset: str,
model_b_dataset: str
):
eval_a = run_ragas_evaluation(model_a_dataset)
eval_b = run_ragas_evaluation(model_b_dataset)
compare_results(eval_a.outputs, eval_b.outputs)
Set up automated quality gates:
def check_quality_gates(metrics):
thresholds = {
"faithfulness": 0.8,
"answer_relevancy": 0.7,
"context_precision": 0.75
}
for metric, threshold in thresholds.items():
if metrics[metric] < threshold:
raise ValueError(f"{metric} below threshold: {metrics[metric]} < {threshold}")
Track experiments with MLflow:
import mlflow
with mlflow.start_run():
mlflow.log_metrics(evaluation_results)
mlflow.log_artifact("evaluation_report.json")
- Version Control: Version your evaluation datasets
- Reproducibility: Use fixed random seeds and model versions
- Cost Management: Monitor API usage and costs
- Data Privacy: Ensure sensitive data is properly handled
- Scalability: Design for your expected dataset sizes
- Monitoring: Set up alerts for pipeline failures
- ImportError: Ensure all required packages are in base image
- Authentication: Check API keys and cluster permissions
- Resource Limits: Adjust CPU/memory based on dataset size
- Data Access: Verify storage permissions and paths
# Check pod status
kubectl get pods -n kubeflow
# View pipeline logs
kubectl logs -n kubeflow <pipeline-pod-name>
# Check resource usage
kubectl top pods -n kubeflow