Skip to content

Instantly share code, notes, and snippets.

@aleenprd
Created October 31, 2025 11:37
Show Gist options
  • Save aleenprd/866c587f36a4f4099c16effad4e433c7 to your computer and use it in GitHub Desktop.
Save aleenprd/866c587f36a4f4099c16effad4e433c7 to your computer and use it in GitHub Desktop.
rag over sv
import os
import sys
from pprint import pprint
import asyncio
from loguru import logger
from dotenv import load_dotenv
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.llms.openai import OpenAI
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.readers.file import PandasCSVReader
load_dotenv() # Load environment variables from .env file
logger.remove() # Remove default logger
logger.add(sink=sys.stdout, level="INFO") # Log to stdout with INFO level
# Load CSV files with metadata
documents = []
# Define metadata for each CSV file
csv_metadata = {
"employees.csv": {
"description": "Employee data including staff information, departments, positions, salaries, and management hierarchy",
"content_type": "employee_records",
"columns": "employee_id, first_name, last_name, email, department, position, hire_date, salary, manager_id, office_location",
},
"customers.csv": {
"description": "Customer data including personal information, contact details, and purchase history",
"content_type": "customer_records",
"columns": "customer_id, first_name, last_name, email, phone, city, state, country, registration_date, age, total_purchases",
},
"products.csv": {
"description": "Product catalog with inventory, pricing, and supplier information",
"content_type": "product_catalog",
"columns": "product_id, product_name, category, brand, price, stock_quantity, supplier, description, rating, reviews_count",
},
"orders.csv": {
"description": "Order transactions linking customers to products with shipping and payment details",
"content_type": "order_transactions",
"columns": "order_id, customer_id, product_id, quantity, order_date, ship_date, order_status, payment_method, shipping_address, order_total",
},
"sales_performance.csv": {
"description": "Sales performance data showing employee sales metrics and commission information",
"content_type": "sales_metrics",
"columns": "sale_id, employee_id, sale_date, product_category, revenue, commission, customer_segment, region, quarter",
},
}
parser = PandasCSVReader()
file_extractor = {".csv": parser}
# Load documents with enhanced metadata
for filename in os.listdir("./docs/csv"):
if filename.endswith(".csv"):
file_documents = SimpleDirectoryReader(
input_files=[f"./docs/csv/{filename}"], file_extractor=file_extractor
).load_data()
# Add metadata to each document
for doc in file_documents:
if filename in csv_metadata:
doc.metadata.update(csv_metadata[filename])
doc.metadata["source_file"] = filename
documents.extend(file_documents)
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
async def search_documents(query: str) -> str:
"""
Search across CSV datasets including:
- employees.csv: Employee records with staff info, departments, positions, salaries
- customers.csv: Customer data with personal info, contact details, purchase history
- products.csv: Product catalog with inventory, pricing, supplier information
- orders.csv: Order transactions linking customers to products
- sales_performance.csv: Sales metrics and commission data
When searching for people named 'Ashley', specify if you want employees or customers.
"""
response = await query_engine.aquery(query)
return str(response)
system_prompt = """You are a helpful assistant who can answer questions based on CSV datasets.
When answering questions:
1. Be specific about which dataset you're referencing
2. If someone asks about any name, clarify whether they mean employees or customers
3. Always mention the source file when providing information
4. If data exists in multiple files, explain the context of each
Example: "I found Ashley Miller in the employees.csv file - she's a Sales Director with employee ID EMP012."
"""
agent = FunctionAgent(
tools=[search_documents],
llm=OpenAI(model="gpt-5"),
system_prompt=system_prompt,
streaming=True,
)
async def main(prompt: str = None):
response = await agent.run(prompt)
print(str(response))
if __name__ == "__main__":
asyncio.run(main(input("Enter your prompt: ")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment