Last active
June 30, 2025 13:23
-
-
Save abutbul/19b21e61fe886347c69c0265f13b4514 to your computer and use it in GitHub Desktop.
The llms.txt convention defines a simple format for exposing LLM-ready documentation. GitBook now automatically publishes this for supported projects, enabling easier integration with RAG pipelines. A growing index of public llms.txt files allows for scalable discovery and ingestion of structured sources. A small script is available to fetch all…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# LLMs.txt Documentation Scraper for RAG | |
# | |
# Author: David Abutbul | |
# GitHub: https://github.com/abutbul | |
# Date: June 30, 2025 | |
# | |
# Description: | |
# This script is a utility designed to streamline the data ingestion phase of | |
# Retrieval-Augmented Generation (RAG) development. It helps users who need | |
# high-quality source documents by fetching all documentation listed in an | |
# `llms.txt` file and saving it to a local folder. | |
# | |
# The RAG Development Process & the `llms.txt` Standard: | |
# A crucial and often difficult step in building a RAG system is sourcing | |
# relevant, well-structured documentation. The `llms.txt` standard, recently | |
# highlighted by GitBook's "LLM-ready docs" feature (https://gitbook.com/docs/llm-ready-docs), | |
# addresses this challenge. It proposes a simple, machine-readable file (`llms.txt`) | |
# at the root of a documentation site that lists all URLs intended for LLM consumption. | |
# | |
# This provides a standardized way to discover and ingest high-quality content. | |
# A community-maintained index of sites using the `llms.txt` standard can be | |
# found at: https://llmstxt.site/ | |
# | |
# Current Features: | |
# - Scrapes all documentation URLs from a provided `llms.txt` file. | |
# - Downloads the content from each URL. | |
# - Stores the downloaded documents in a specified local folder for RAG ingestion. | |
# | |
# Extendibility: | |
# - Integrate directly with a vector database for automatic embedding. | |
# - Add more sophisticated content cleaning and chunking strategies. | |
# - Build out the full RAG pipeline (retrieval, generation) using the scraped data. | |
# | |
import os | |
import re | |
import sys | |
import asyncio | |
import argparse | |
import httpx | |
import itertools | |
from urllib.parse import urlparse | |
from typing import List, Dict, Any | |
from bs4 import BeautifulSoup, Tag | |
BASE_URL = "https://llmstxt.site/" | |
DOWNLOADS_LOG = "downloads.log" | |
FAILED_LOG = "failed-downloads.log" | |
CONCURRENT_DOWNLOADS = 10 | |
OUTPUT_DIR = "./" | |
def parse_arguments(): | |
"""Parse command line arguments.""" | |
parser = argparse.ArgumentParser( | |
description="Download LLM documentation files from llmstxt.site", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
python download_llms.py # Download to current directory | |
python download_llms.py --output ./downloads # Download to ./downloads | |
python download_llms.py --output /tmp/llm-docs # Download to /tmp/llm-docs | |
python download_llms.py --output ../docs # Download to ../docs | |
""" | |
) | |
parser.add_argument( | |
"--output", "-o", | |
type=str, | |
default="./", | |
help="Output directory for downloaded files (default: current directory)" | |
) | |
return parser.parse_args() | |
def sanitize_filename(name): | |
"""Sanitize a string to be a valid filename.""" | |
name = name.lower().replace(' ', '_') | |
return re.sub(r'[^a-z0-9_.-]', '', name) | |
async def get_download_tasks(client: httpx.AsyncClient, config: Dict[str, Any]): | |
"""Fetch the main page and parse it to create a list of download tasks.""" | |
try: | |
print(f"Fetching index from {BASE_URL}...") | |
response = await client.get(BASE_URL) | |
response.raise_for_status() | |
print(f"Response status: {response.status_code}") | |
print(f"Content-Type: {response.headers.get('content-type', 'unknown')}") | |
print(f"Content-Encoding: {response.headers.get('content-encoding', 'none')}") | |
print(f"Response size: {len(response.content)} bytes") | |
except httpx.RequestError as e: | |
print(f"Error fetching index: {e}", file=sys.stderr) | |
return [] | |
try: | |
text_content = response.text | |
print(f"Successfully decoded response as text. Length: {len(text_content)}") | |
except Exception as e: | |
print(f"Failed to decode response as text: {e}") | |
return [] | |
soup = BeautifulSoup(text_content, "html.parser") | |
print(f"Parsed HTML successfully. Title: {soup.title.string if soup.title else 'No title'}") | |
table = soup.find("table") | |
if not table: | |
print("Could not find the data table on the page.", file=sys.stderr) | |
all_tables = soup.find_all("table") | |
print(f"Found {len(all_tables)} tables total") | |
divs_with_table_class = soup.find_all("div", attrs={"class": lambda x: x and "table" in str(x).lower()}) | |
print(f"Found {len(divs_with_table_class)} divs with 'table' in class name") | |
main_content = soup.find("main") or soup.find("body") or soup | |
if main_content: | |
print("Main content structure (first 1000 chars):") | |
print(main_content.get_text()[:1000]) | |
return [] | |
tasks = [] | |
tbody = table.find("tbody") | |
if tbody and isinstance(tbody, Tag) and hasattr(tbody, 'find_all'): | |
rows = tbody.find_all("tr") | |
else: | |
if isinstance(table, Tag) and hasattr(table, 'find_all'): | |
all_rows = table.find_all("tr") | |
rows = all_rows[1:] if len(all_rows) > 1 else all_rows | |
else: | |
print("Table element doesn't support find_all method") | |
return [] | |
print(f"Found {len(rows)} data rows in table") | |
for i, row in enumerate(rows): | |
cols = row.find_all("td") | |
if len(cols) < 4: | |
print(f"Row {i}: Skipping row with only {len(cols)} columns") | |
continue | |
product_name = cols[0].text.strip() | |
domain = cols[1].text.strip() | |
llms_txt_url = cols[2].text.strip() | |
tokens = cols[3].text.strip() | |
llms_full_url = None | |
if len(cols) >= 5: | |
llms_full_url = cols[4].text.strip() | |
sanitized_name = sanitize_filename(product_name) | |
if llms_full_url and llms_full_url != "": | |
if not llms_full_url.startswith('http'): | |
llms_full_url = f"https://{llms_full_url}" | |
# Only process if the link ends with .txt or .md | |
if not (llms_full_url.lower().endswith('.txt') or llms_full_url.lower().endswith('.md')): | |
print(f"Row {i}: Skipping non-.txt/.md file: {llms_full_url}") | |
continue | |
full_tokens = cols[5].text.strip() if len(cols) >= 6 else tokens | |
folder_name = os.path.join(config["output_dir"], sanitized_name) | |
url_filename = os.path.basename(urlparse(llms_full_url).path) | |
filename = os.path.join(folder_name, url_filename) | |
tasks.append({ | |
"product_name": product_name, | |
"url": llms_full_url, | |
"filename": filename, | |
"tokens": full_tokens, | |
"is_full": True, | |
"folder_name": folder_name, | |
"priority": 1 | |
}) | |
print(f"Created {len(tasks)} download tasks (full-llm txt/md only)") | |
tasks.sort(key=lambda x: x.get("priority", 3)) | |
return tasks | |
async def download_file(client: httpx.AsyncClient, task: Dict[str, Any], config: Dict[str, Any], progress_callback): | |
"""Downloads a single file (full-llm txt only).""" | |
filename = task["filename"] | |
url = task["url"] | |
folder_name = task.get("folder_name", None) | |
if folder_name and not os.path.exists(folder_name): | |
os.makedirs(folder_name) | |
if os.path.exists(filename): | |
progress_callback(f"Skipped (exists): {filename}") | |
return | |
try: | |
headers = {"Accept": "text/plain, */*;q=0.8"} | |
async with client.stream("GET", url, headers=headers, timeout=60) as response: | |
response.raise_for_status() | |
content_type = response.headers.get("content-type", "") | |
if "text/html" in content_type: | |
raise ValueError(f"Invalid content type '{content_type}', expected text.") | |
content = b"" | |
async for chunk in response.aiter_bytes(): | |
content += chunk | |
with open(filename, "wb") as f: | |
f.write(content) | |
with open(config["downloads_log"], "a") as log: | |
log.write(f"{task['product_name']}|{url}|{task['tokens']}\n") | |
progress_callback(f"Downloaded: {filename}") | |
except (httpx.RequestError, httpx.HTTPStatusError, ValueError, IOError) as e: | |
with open(config["failed_log"], "a") as log: | |
log.write(f"{url}|{filename}|{e}\n") | |
progress_callback(f"Failed: {filename}") | |
if os.path.exists(filename): | |
os.remove(filename) | |
spinner = itertools.cycle(['|', '/', '-', '\\']) | |
def update_progress(processed_count, total_count, message): | |
"""Updates a single line in the console with a spinner animation.""" | |
percent = min(100, (processed_count / total_count) * 100) if total_count > 0 else 0 | |
try: | |
terminal_width = os.get_terminal_size().columns | |
except OSError: | |
terminal_width = 80 | |
stats = f"{next(spinner)} {percent:.1f}% [{processed_count}/{total_count}] " | |
available_width = terminal_width - len(stats) - 1 | |
if available_width < 10: | |
available_width = 10 | |
if len(message) > available_width: | |
message = message[:available_width-3] + "..." | |
status_line = f"\r{stats}{message.ljust(available_width)}\x1b[K" | |
sys.stdout.write(status_line) | |
sys.stdout.flush() | |
async def main(): | |
"""Main function to orchestrate the download process.""" | |
args = parse_arguments() | |
output_dir = os.path.abspath(args.output) | |
config = { | |
"output_dir": output_dir, | |
"downloads_log": os.path.join(output_dir, "downloads.log"), | |
"failed_log": os.path.join(output_dir, "failed-downloads.log"), | |
"base_url": BASE_URL | |
} | |
if not os.path.exists(config["output_dir"]): | |
try: | |
os.makedirs(config["output_dir"]) | |
print(f"Created output directory: {config['output_dir']}") | |
except OSError as e: | |
print(f"Error creating output directory '{config['output_dir']}': {e}", file=sys.stderr) | |
sys.exit(1) | |
open(config["downloads_log"], 'a').close() | |
open(config["failed_log"], 'a').close() | |
print(f"Output directory: {config['output_dir']}") | |
print(f"Downloads will be organized in company-specific folders") | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.9", | |
"Cache-Control": "no-cache", | |
} | |
async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client: | |
tasks_to_run = await get_download_tasks(client, config) | |
if not tasks_to_run: | |
print("No download tasks found. Exiting.") | |
return | |
total_tasks = len(tasks_to_run) | |
print(f"Found {total_tasks} files to potentially download.") | |
processed_count = 0 | |
def progress_callback(message): | |
update_progress(processed_count, total_tasks, message) | |
semaphore = asyncio.Semaphore(CONCURRENT_DOWNLOADS) | |
async def worker(task): | |
nonlocal processed_count | |
async with semaphore: | |
await download_file(client, task, config, progress_callback) | |
processed_count += 1 | |
completed_message = f"Completed: {os.path.basename(task['filename'])}" | |
update_progress(processed_count, total_tasks, completed_message) | |
download_coroutines = [worker(task) for task in tasks_to_run] | |
await asyncio.gather(*download_coroutines) | |
for task in tasks_to_run: | |
folder = task.get("folder_name") | |
if folder and os.path.isdir(folder): | |
if not any(os.scandir(folder)): | |
try: | |
os.rmdir(folder) | |
print(f"Removed empty folder: {folder}") | |
except Exception as e: | |
print(f"Failed to remove empty folder {folder}: {e}") | |
print("\n\nDownload process finished.") | |
print(f"Files downloaded to: {config['output_dir']}") | |
print(f"Successful downloads logged in: {config['downloads_log']}") | |
print(f"Failed downloads logged in: {config['failed_log']}") | |
if __name__ == "__main__": | |
try: | |
print("Dependencies check passed. Running the download script...") | |
except ImportError as e: | |
print(f"Missing dependency: {e}") | |
print("Install missing packages with: pip install httpx beautifulsoup4") | |
sys.exit(1) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment