Skip to content

Instantly share code, notes, and snippets.

@EnkrateiaLucca
Created September 4, 2025 22:36
Show Gist options
  • Save EnkrateiaLucca/dce38e31df5fc882f0df8cc6ca8f2358 to your computer and use it in GitHub Desktop.
Save EnkrateiaLucca/dce38e31df5fc882f0df8cc6ca8f2358 to your computer and use it in GitHub Desktop.
Simple rag script for the uv package manager documentation....(one shotted with claude code...)
#!/usr/bin/env python3
"""
UV Documentation RAG System
A comprehensive scraper and RAG system for the uv package documentation.
Uses OpenAI's API for embeddings and GPT-4 for answering questions.
"""
import os
import sys
import json
import time
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from typing import List, Dict, Tuple, Optional
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import openai
from dataclasses import dataclass
import pickle
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Document:
"""Represents a documentation page."""
url: str
title: str
content: str
chunks: List[str]
embeddings: Optional[np.ndarray] = None
class UVDocsRAG:
"""Main RAG system for UV documentation."""
def __init__(self, openai_api_key: str, base_url: str = "https://docs.astral.sh/uv/"):
self.base_url = base_url
self.openai_client = openai.OpenAI(api_key=openai_api_key)
self.documents: List[Document] = []
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
self.cache_file = "uv_docs_cache.pkl"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; UV-Docs-RAG/1.0)'
})
def discover_pages(self) -> List[str]:
"""Discover all documentation pages by crawling the site."""
discovered_urls = set()
to_visit = [self.base_url]
visited = set()
logger.info("Starting page discovery...")
while to_visit:
current_url = to_visit.pop(0)
if current_url in visited:
continue
visited.add(current_url)
try:
response = self.session.get(current_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Find all links that look like documentation pages
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(current_url, href)
# Only include URLs from the same domain and documentation section
if (full_url.startswith(self.base_url) and
'#' not in full_url and
full_url not in visited and
not full_url.endswith('.pdf') and
not full_url.endswith('.zip')):
discovered_urls.add(full_url)
if full_url not in to_visit:
to_visit.append(full_url)
logger.info(f"Discovered {len(discovered_urls)} unique pages so far...")
except Exception as e:
logger.warning(f"Failed to fetch {current_url}: {e}")
continue
logger.info(f"Discovery complete. Found {len(discovered_urls)} pages.")
return list(discovered_urls)
def scrape_page(self, url: str) -> Optional[Document]:
"""Scrape content from a single documentation page."""
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove navigation, footer, and other non-content elements
for element in soup.find_all(['nav', 'footer', 'header', 'aside']):
element.decompose()
# Try to find the main content area
main_content = (
soup.find('main') or
soup.find('article') or
soup.find('div', class_='content') or
soup.find('div', class_='documentation') or
soup
)
# Extract title
title = (
soup.find('h1').get_text(strip=True) if soup.find('h1') else
soup.find('title').get_text(strip=True) if soup.find('title') else
url.split('/')[-1]
)
# Extract text content
content = main_content.get_text(separator='\n', strip=True)
# Clean up the content
lines = [line.strip() for line in content.split('\n') if line.strip()]
content = '\n'.join(lines)
if len(content) < 100: # Skip pages with minimal content
return None
# Create chunks
chunks = self.chunk_text(content)
return Document(url=url, title=title, content=content, chunks=chunks)
except Exception as e:
logger.warning(f"Failed to scrape {url}: {e}")
return None
def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
"""Split text into overlapping chunks."""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# Try to break at sentence boundaries
if end < len(text):
# Look for sentence endings near the chunk boundary
for i in range(min(100, len(text) - end)):
if text[end + i] in '.!?\n':
end = end + i + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap
if start >= len(text):
break
return chunks
def generate_embeddings(self, texts: List[str]) -> np.ndarray:
"""Generate embeddings for a list of texts using OpenAI."""
embeddings = []
batch_size = 100 # OpenAI's batch limit
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
try:
response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=batch
)
batch_embeddings = [data.embedding for data in response.data]
embeddings.extend(batch_embeddings)
# Rate limiting
time.sleep(0.1)
except Exception as e:
logger.error(f"Failed to generate embeddings for batch {i}: {e}")
# Fallback: generate zero embeddings
embeddings.extend([[0.0] * 1536] * len(batch))
return np.array(embeddings)
def scrape_all_docs(self):
"""Scrape all documentation pages."""
if os.path.exists(self.cache_file):
logger.info("Loading documents from cache...")
with open(self.cache_file, 'rb') as f:
self.documents = pickle.load(f)
logger.info(f"Loaded {len(self.documents)} documents from cache.")
return
urls = self.discover_pages()
logger.info(f"Scraping {len(urls)} pages...")
# Scrape pages in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(self.scrape_page, url): url for url in urls}
for future in as_completed(future_to_url):
doc = future.result()
if doc:
self.documents.append(doc)
logger.info(f"Scraped: {doc.title}")
logger.info(f"Scraped {len(self.documents)} valid documents.")
# Generate embeddings for all chunks
all_chunks = []
chunk_to_doc = {}
for doc_idx, doc in enumerate(self.documents):
for chunk_idx, chunk in enumerate(doc.chunks):
all_chunks.append(chunk)
chunk_to_doc[len(all_chunks) - 1] = (doc_idx, chunk_idx)
logger.info(f"Generating embeddings for {len(all_chunks)} chunks...")
embeddings = self.generate_embeddings(all_chunks)
# Assign embeddings back to documents
for chunk_idx, (doc_idx, _) in chunk_to_doc.items():
if self.documents[doc_idx].embeddings is None:
self.documents[doc_idx].embeddings = []
self.documents[doc_idx].embeddings.append(embeddings[chunk_idx])
# Convert embeddings to numpy arrays
for doc in self.documents:
if doc.embeddings:
doc.embeddings = np.array(doc.embeddings)
# Cache the results
with open(self.cache_file, 'wb') as f:
pickle.dump(self.documents, f)
logger.info("Scraping complete and cached.")
def search_similar_chunks(self, query: str, top_k: int = 10) -> List[Tuple[str, str, float]]:
"""Search for the most similar chunks to a query."""
# Generate query embedding
query_embedding = self.generate_embeddings([query])[0]
# Find similar chunks
similarities = []
for doc in self.documents:
if doc.embeddings is not None:
# Calculate cosine similarity
sims = cosine_similarity([query_embedding], doc.embeddings)[0]
for i, sim in enumerate(sims):
similarities.append((doc.chunks[i], doc.url, sim))
# Sort by similarity and return top-k
similarities.sort(key=lambda x: x[2], reverse=True)
return similarities[:top_k]
def rerank_with_tfidf(self, query: str, chunks: List[str], top_k: int = 5) -> List[int]:
"""Re-rank chunks using TF-IDF similarity."""
if not chunks:
return []
# Fit TF-IDF on chunks + query
documents = chunks + [query]
tfidf_matrix = self.vectorizer.fit_transform(documents)
# Calculate similarity between query and each chunk
query_vec = tfidf_matrix[-1] # Last document is the query
chunk_vecs = tfidf_matrix[:-1] # All but the last
similarities = cosine_similarity(query_vec, chunk_vecs)[0]
# Get indices of top-k most similar chunks
top_indices = np.argsort(similarities)[::-1][:top_k]
return top_indices.tolist()
def answer_question(self, question: str) -> str:
"""Answer a question using the RAG system."""
logger.info(f"Answering question: {question}")
# Step 1: Retrieve similar chunks
similar_chunks = self.search_similar_chunks(question, top_k=20)
if not similar_chunks:
return "I couldn't find relevant information in the UV documentation."
# Step 2: Re-rank using TF-IDF
chunks = [chunk for chunk, _, _ in similar_chunks]
reranked_indices = self.rerank_with_tfidf(question, chunks, top_k=5)
# Step 3: Prepare context from top chunks
context_chunks = [chunks[i] for i in reranked_indices]
context = "\n\n---\n\n".join(context_chunks)
# Step 4: Generate answer using OpenAI
try:
response = self.openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """You are an expert assistant for the UV package documentation.
Answer questions based solely on the provided documentation context.
Be precise, helpful, and cite specific features or commands when relevant.
If the context doesn't contain enough information to answer the question, say so clearly."""
},
{
"role": "user",
"content": f"""Based on the following UV documentation excerpts, please answer this question: {question}
Documentation context:
{context}
Please provide a comprehensive answer based on the documentation provided."""
}
],
temperature=0.1,
max_tokens=1000
)
answer = response.choices[0].message.content
# Add source information
sources = list(set([url for _, url, _ in similar_chunks[:5]]))
source_text = "\n\nSources:\n" + "\n".join(f"- {url}" for url in sources)
return answer + source_text
except Exception as e:
logger.error(f"Failed to generate answer: {e}")
return "I encountered an error while generating the answer. Please try again."
def main():
"""Main function to run the RAG system."""
# Check for OpenAI API key
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
print("Please set your OPENAI_API_KEY environment variable.")
print("You can get an API key from: https://platform.openai.com/api-keys")
sys.exit(1)
# Initialize the RAG system
rag = UVDocsRAG(openai_api_key)
print("UV Documentation RAG System")
print("=" * 40)
print("Initializing... This may take a few minutes on first run.")
# Scrape and index documents
rag.scrape_all_docs()
print(f"\nLoaded {len(rag.documents)} documentation pages.")
print("You can now ask questions about UV! Type 'quit' to exit.\n")
# Interactive question-answering loop
while True:
try:
question = input("Ask a question about UV: ").strip()
if question.lower() in ['quit', 'exit', 'q']:
print("Goodbye!")
break
if not question:
continue
print("\nThinking...")
answer = rag.answer_question(question)
print(f"\nAnswer:\n{answer}\n")
print("-" * 60)
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment