Created
September 4, 2025 22:36
-
-
Save EnkrateiaLucca/dce38e31df5fc882f0df8cc6ca8f2358 to your computer and use it in GitHub Desktop.
Simple rag script for the uv package manager documentation....(one shotted with claude code...)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
UV Documentation RAG System | |
A comprehensive scraper and RAG system for the uv package documentation. | |
Uses OpenAI's API for embeddings and GPT-4 for answering questions. | |
""" | |
import os | |
import sys | |
import json | |
import time | |
import requests | |
from urllib.parse import urljoin, urlparse | |
from bs4 import BeautifulSoup | |
from typing import List, Dict, Tuple, Optional | |
import numpy as np | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import openai | |
from dataclasses import dataclass | |
import pickle | |
import hashlib | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
@dataclass | |
class Document: | |
"""Represents a documentation page.""" | |
url: str | |
title: str | |
content: str | |
chunks: List[str] | |
embeddings: Optional[np.ndarray] = None | |
class UVDocsRAG: | |
"""Main RAG system for UV documentation.""" | |
def __init__(self, openai_api_key: str, base_url: str = "https://docs.astral.sh/uv/"): | |
self.base_url = base_url | |
self.openai_client = openai.OpenAI(api_key=openai_api_key) | |
self.documents: List[Document] = [] | |
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') | |
self.cache_file = "uv_docs_cache.pkl" | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (compatible; UV-Docs-RAG/1.0)' | |
}) | |
def discover_pages(self) -> List[str]: | |
"""Discover all documentation pages by crawling the site.""" | |
discovered_urls = set() | |
to_visit = [self.base_url] | |
visited = set() | |
logger.info("Starting page discovery...") | |
while to_visit: | |
current_url = to_visit.pop(0) | |
if current_url in visited: | |
continue | |
visited.add(current_url) | |
try: | |
response = self.session.get(current_url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Find all links that look like documentation pages | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
full_url = urljoin(current_url, href) | |
# Only include URLs from the same domain and documentation section | |
if (full_url.startswith(self.base_url) and | |
'#' not in full_url and | |
full_url not in visited and | |
not full_url.endswith('.pdf') and | |
not full_url.endswith('.zip')): | |
discovered_urls.add(full_url) | |
if full_url not in to_visit: | |
to_visit.append(full_url) | |
logger.info(f"Discovered {len(discovered_urls)} unique pages so far...") | |
except Exception as e: | |
logger.warning(f"Failed to fetch {current_url}: {e}") | |
continue | |
logger.info(f"Discovery complete. Found {len(discovered_urls)} pages.") | |
return list(discovered_urls) | |
def scrape_page(self, url: str) -> Optional[Document]: | |
"""Scrape content from a single documentation page.""" | |
try: | |
response = self.session.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove navigation, footer, and other non-content elements | |
for element in soup.find_all(['nav', 'footer', 'header', 'aside']): | |
element.decompose() | |
# Try to find the main content area | |
main_content = ( | |
soup.find('main') or | |
soup.find('article') or | |
soup.find('div', class_='content') or | |
soup.find('div', class_='documentation') or | |
soup | |
) | |
# Extract title | |
title = ( | |
soup.find('h1').get_text(strip=True) if soup.find('h1') else | |
soup.find('title').get_text(strip=True) if soup.find('title') else | |
url.split('/')[-1] | |
) | |
# Extract text content | |
content = main_content.get_text(separator='\n', strip=True) | |
# Clean up the content | |
lines = [line.strip() for line in content.split('\n') if line.strip()] | |
content = '\n'.join(lines) | |
if len(content) < 100: # Skip pages with minimal content | |
return None | |
# Create chunks | |
chunks = self.chunk_text(content) | |
return Document(url=url, title=title, content=content, chunks=chunks) | |
except Exception as e: | |
logger.warning(f"Failed to scrape {url}: {e}") | |
return None | |
def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: | |
"""Split text into overlapping chunks.""" | |
if len(text) <= chunk_size: | |
return [text] | |
chunks = [] | |
start = 0 | |
while start < len(text): | |
end = start + chunk_size | |
# Try to break at sentence boundaries | |
if end < len(text): | |
# Look for sentence endings near the chunk boundary | |
for i in range(min(100, len(text) - end)): | |
if text[end + i] in '.!?\n': | |
end = end + i + 1 | |
break | |
chunk = text[start:end].strip() | |
if chunk: | |
chunks.append(chunk) | |
start = end - overlap | |
if start >= len(text): | |
break | |
return chunks | |
def generate_embeddings(self, texts: List[str]) -> np.ndarray: | |
"""Generate embeddings for a list of texts using OpenAI.""" | |
embeddings = [] | |
batch_size = 100 # OpenAI's batch limit | |
for i in range(0, len(texts), batch_size): | |
batch = texts[i:i + batch_size] | |
try: | |
response = self.openai_client.embeddings.create( | |
model="text-embedding-3-small", | |
input=batch | |
) | |
batch_embeddings = [data.embedding for data in response.data] | |
embeddings.extend(batch_embeddings) | |
# Rate limiting | |
time.sleep(0.1) | |
except Exception as e: | |
logger.error(f"Failed to generate embeddings for batch {i}: {e}") | |
# Fallback: generate zero embeddings | |
embeddings.extend([[0.0] * 1536] * len(batch)) | |
return np.array(embeddings) | |
def scrape_all_docs(self): | |
"""Scrape all documentation pages.""" | |
if os.path.exists(self.cache_file): | |
logger.info("Loading documents from cache...") | |
with open(self.cache_file, 'rb') as f: | |
self.documents = pickle.load(f) | |
logger.info(f"Loaded {len(self.documents)} documents from cache.") | |
return | |
urls = self.discover_pages() | |
logger.info(f"Scraping {len(urls)} pages...") | |
# Scrape pages in parallel | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
future_to_url = {executor.submit(self.scrape_page, url): url for url in urls} | |
for future in as_completed(future_to_url): | |
doc = future.result() | |
if doc: | |
self.documents.append(doc) | |
logger.info(f"Scraped: {doc.title}") | |
logger.info(f"Scraped {len(self.documents)} valid documents.") | |
# Generate embeddings for all chunks | |
all_chunks = [] | |
chunk_to_doc = {} | |
for doc_idx, doc in enumerate(self.documents): | |
for chunk_idx, chunk in enumerate(doc.chunks): | |
all_chunks.append(chunk) | |
chunk_to_doc[len(all_chunks) - 1] = (doc_idx, chunk_idx) | |
logger.info(f"Generating embeddings for {len(all_chunks)} chunks...") | |
embeddings = self.generate_embeddings(all_chunks) | |
# Assign embeddings back to documents | |
for chunk_idx, (doc_idx, _) in chunk_to_doc.items(): | |
if self.documents[doc_idx].embeddings is None: | |
self.documents[doc_idx].embeddings = [] | |
self.documents[doc_idx].embeddings.append(embeddings[chunk_idx]) | |
# Convert embeddings to numpy arrays | |
for doc in self.documents: | |
if doc.embeddings: | |
doc.embeddings = np.array(doc.embeddings) | |
# Cache the results | |
with open(self.cache_file, 'wb') as f: | |
pickle.dump(self.documents, f) | |
logger.info("Scraping complete and cached.") | |
def search_similar_chunks(self, query: str, top_k: int = 10) -> List[Tuple[str, str, float]]: | |
"""Search for the most similar chunks to a query.""" | |
# Generate query embedding | |
query_embedding = self.generate_embeddings([query])[0] | |
# Find similar chunks | |
similarities = [] | |
for doc in self.documents: | |
if doc.embeddings is not None: | |
# Calculate cosine similarity | |
sims = cosine_similarity([query_embedding], doc.embeddings)[0] | |
for i, sim in enumerate(sims): | |
similarities.append((doc.chunks[i], doc.url, sim)) | |
# Sort by similarity and return top-k | |
similarities.sort(key=lambda x: x[2], reverse=True) | |
return similarities[:top_k] | |
def rerank_with_tfidf(self, query: str, chunks: List[str], top_k: int = 5) -> List[int]: | |
"""Re-rank chunks using TF-IDF similarity.""" | |
if not chunks: | |
return [] | |
# Fit TF-IDF on chunks + query | |
documents = chunks + [query] | |
tfidf_matrix = self.vectorizer.fit_transform(documents) | |
# Calculate similarity between query and each chunk | |
query_vec = tfidf_matrix[-1] # Last document is the query | |
chunk_vecs = tfidf_matrix[:-1] # All but the last | |
similarities = cosine_similarity(query_vec, chunk_vecs)[0] | |
# Get indices of top-k most similar chunks | |
top_indices = np.argsort(similarities)[::-1][:top_k] | |
return top_indices.tolist() | |
def answer_question(self, question: str) -> str: | |
"""Answer a question using the RAG system.""" | |
logger.info(f"Answering question: {question}") | |
# Step 1: Retrieve similar chunks | |
similar_chunks = self.search_similar_chunks(question, top_k=20) | |
if not similar_chunks: | |
return "I couldn't find relevant information in the UV documentation." | |
# Step 2: Re-rank using TF-IDF | |
chunks = [chunk for chunk, _, _ in similar_chunks] | |
reranked_indices = self.rerank_with_tfidf(question, chunks, top_k=5) | |
# Step 3: Prepare context from top chunks | |
context_chunks = [chunks[i] for i in reranked_indices] | |
context = "\n\n---\n\n".join(context_chunks) | |
# Step 4: Generate answer using OpenAI | |
try: | |
response = self.openai_client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "system", | |
"content": """You are an expert assistant for the UV package documentation. | |
Answer questions based solely on the provided documentation context. | |
Be precise, helpful, and cite specific features or commands when relevant. | |
If the context doesn't contain enough information to answer the question, say so clearly.""" | |
}, | |
{ | |
"role": "user", | |
"content": f"""Based on the following UV documentation excerpts, please answer this question: {question} | |
Documentation context: | |
{context} | |
Please provide a comprehensive answer based on the documentation provided.""" | |
} | |
], | |
temperature=0.1, | |
max_tokens=1000 | |
) | |
answer = response.choices[0].message.content | |
# Add source information | |
sources = list(set([url for _, url, _ in similar_chunks[:5]])) | |
source_text = "\n\nSources:\n" + "\n".join(f"- {url}" for url in sources) | |
return answer + source_text | |
except Exception as e: | |
logger.error(f"Failed to generate answer: {e}") | |
return "I encountered an error while generating the answer. Please try again." | |
def main(): | |
"""Main function to run the RAG system.""" | |
# Check for OpenAI API key | |
openai_api_key = os.getenv('OPENAI_API_KEY') | |
if not openai_api_key: | |
print("Please set your OPENAI_API_KEY environment variable.") | |
print("You can get an API key from: https://platform.openai.com/api-keys") | |
sys.exit(1) | |
# Initialize the RAG system | |
rag = UVDocsRAG(openai_api_key) | |
print("UV Documentation RAG System") | |
print("=" * 40) | |
print("Initializing... This may take a few minutes on first run.") | |
# Scrape and index documents | |
rag.scrape_all_docs() | |
print(f"\nLoaded {len(rag.documents)} documentation pages.") | |
print("You can now ask questions about UV! Type 'quit' to exit.\n") | |
# Interactive question-answering loop | |
while True: | |
try: | |
question = input("Ask a question about UV: ").strip() | |
if question.lower() in ['quit', 'exit', 'q']: | |
print("Goodbye!") | |
break | |
if not question: | |
continue | |
print("\nThinking...") | |
answer = rag.answer_question(question) | |
print(f"\nAnswer:\n{answer}\n") | |
print("-" * 60) | |
except KeyboardInterrupt: | |
print("\nGoodbye!") | |
break | |
except Exception as e: | |
print(f"Error: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment