Skip to content

Instantly share code, notes, and snippets.

@abutbul
Last active June 30, 2025 13:23
Show Gist options
  • Save abutbul/19b21e61fe886347c69c0265f13b4514 to your computer and use it in GitHub Desktop.
Save abutbul/19b21e61fe886347c69c0265f13b4514 to your computer and use it in GitHub Desktop.
The llms.txt convention defines a simple format for exposing LLM-ready documentation. GitBook now automatically publishes this for supported projects, enabling easier integration with RAG pipelines. A growing index of public llms.txt files allows for scalable discovery and ingestion of structured sources. A small script is available to fetch all…
#
# LLMs.txt Documentation Scraper for RAG
#
# Author: David Abutbul
# GitHub: https://github.com/abutbul
# Date: June 30, 2025
#
# Description:
# This script is a utility designed to streamline the data ingestion phase of
# Retrieval-Augmented Generation (RAG) development. It helps users who need
# high-quality source documents by fetching all documentation listed in an
# `llms.txt` file and saving it to a local folder.
#
# The RAG Development Process & the `llms.txt` Standard:
# A crucial and often difficult step in building a RAG system is sourcing
# relevant, well-structured documentation. The `llms.txt` standard, recently
# highlighted by GitBook's "LLM-ready docs" feature (https://gitbook.com/docs/llm-ready-docs),
# addresses this challenge. It proposes a simple, machine-readable file (`llms.txt`)
# at the root of a documentation site that lists all URLs intended for LLM consumption.
#
# This provides a standardized way to discover and ingest high-quality content.
# A community-maintained index of sites using the `llms.txt` standard can be
# found at: https://llmstxt.site/
#
# Current Features:
# - Scrapes all documentation URLs from a provided `llms.txt` file.
# - Downloads the content from each URL.
# - Stores the downloaded documents in a specified local folder for RAG ingestion.
#
# Extendibility:
# - Integrate directly with a vector database for automatic embedding.
# - Add more sophisticated content cleaning and chunking strategies.
# - Build out the full RAG pipeline (retrieval, generation) using the scraped data.
#
import os
import re
import sys
import asyncio
import argparse
import httpx
import itertools
from urllib.parse import urlparse
from typing import List, Dict, Any
from bs4 import BeautifulSoup, Tag
BASE_URL = "https://llmstxt.site/"
DOWNLOADS_LOG = "downloads.log"
FAILED_LOG = "failed-downloads.log"
CONCURRENT_DOWNLOADS = 10
OUTPUT_DIR = "./"
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Download LLM documentation files from llmstxt.site",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python download_llms.py # Download to current directory
python download_llms.py --output ./downloads # Download to ./downloads
python download_llms.py --output /tmp/llm-docs # Download to /tmp/llm-docs
python download_llms.py --output ../docs # Download to ../docs
"""
)
parser.add_argument(
"--output", "-o",
type=str,
default="./",
help="Output directory for downloaded files (default: current directory)"
)
return parser.parse_args()
def sanitize_filename(name):
"""Sanitize a string to be a valid filename."""
name = name.lower().replace(' ', '_')
return re.sub(r'[^a-z0-9_.-]', '', name)
async def get_download_tasks(client: httpx.AsyncClient, config: Dict[str, Any]):
"""Fetch the main page and parse it to create a list of download tasks."""
try:
print(f"Fetching index from {BASE_URL}...")
response = await client.get(BASE_URL)
response.raise_for_status()
print(f"Response status: {response.status_code}")
print(f"Content-Type: {response.headers.get('content-type', 'unknown')}")
print(f"Content-Encoding: {response.headers.get('content-encoding', 'none')}")
print(f"Response size: {len(response.content)} bytes")
except httpx.RequestError as e:
print(f"Error fetching index: {e}", file=sys.stderr)
return []
try:
text_content = response.text
print(f"Successfully decoded response as text. Length: {len(text_content)}")
except Exception as e:
print(f"Failed to decode response as text: {e}")
return []
soup = BeautifulSoup(text_content, "html.parser")
print(f"Parsed HTML successfully. Title: {soup.title.string if soup.title else 'No title'}")
table = soup.find("table")
if not table:
print("Could not find the data table on the page.", file=sys.stderr)
all_tables = soup.find_all("table")
print(f"Found {len(all_tables)} tables total")
divs_with_table_class = soup.find_all("div", attrs={"class": lambda x: x and "table" in str(x).lower()})
print(f"Found {len(divs_with_table_class)} divs with 'table' in class name")
main_content = soup.find("main") or soup.find("body") or soup
if main_content:
print("Main content structure (first 1000 chars):")
print(main_content.get_text()[:1000])
return []
tasks = []
tbody = table.find("tbody")
if tbody and isinstance(tbody, Tag) and hasattr(tbody, 'find_all'):
rows = tbody.find_all("tr")
else:
if isinstance(table, Tag) and hasattr(table, 'find_all'):
all_rows = table.find_all("tr")
rows = all_rows[1:] if len(all_rows) > 1 else all_rows
else:
print("Table element doesn't support find_all method")
return []
print(f"Found {len(rows)} data rows in table")
for i, row in enumerate(rows):
cols = row.find_all("td")
if len(cols) < 4:
print(f"Row {i}: Skipping row with only {len(cols)} columns")
continue
product_name = cols[0].text.strip()
domain = cols[1].text.strip()
llms_txt_url = cols[2].text.strip()
tokens = cols[3].text.strip()
llms_full_url = None
if len(cols) >= 5:
llms_full_url = cols[4].text.strip()
sanitized_name = sanitize_filename(product_name)
if llms_full_url and llms_full_url != "":
if not llms_full_url.startswith('http'):
llms_full_url = f"https://{llms_full_url}"
# Only process if the link ends with .txt or .md
if not (llms_full_url.lower().endswith('.txt') or llms_full_url.lower().endswith('.md')):
print(f"Row {i}: Skipping non-.txt/.md file: {llms_full_url}")
continue
full_tokens = cols[5].text.strip() if len(cols) >= 6 else tokens
folder_name = os.path.join(config["output_dir"], sanitized_name)
url_filename = os.path.basename(urlparse(llms_full_url).path)
filename = os.path.join(folder_name, url_filename)
tasks.append({
"product_name": product_name,
"url": llms_full_url,
"filename": filename,
"tokens": full_tokens,
"is_full": True,
"folder_name": folder_name,
"priority": 1
})
print(f"Created {len(tasks)} download tasks (full-llm txt/md only)")
tasks.sort(key=lambda x: x.get("priority", 3))
return tasks
async def download_file(client: httpx.AsyncClient, task: Dict[str, Any], config: Dict[str, Any], progress_callback):
"""Downloads a single file (full-llm txt only)."""
filename = task["filename"]
url = task["url"]
folder_name = task.get("folder_name", None)
if folder_name and not os.path.exists(folder_name):
os.makedirs(folder_name)
if os.path.exists(filename):
progress_callback(f"Skipped (exists): {filename}")
return
try:
headers = {"Accept": "text/plain, */*;q=0.8"}
async with client.stream("GET", url, headers=headers, timeout=60) as response:
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "text/html" in content_type:
raise ValueError(f"Invalid content type '{content_type}', expected text.")
content = b""
async for chunk in response.aiter_bytes():
content += chunk
with open(filename, "wb") as f:
f.write(content)
with open(config["downloads_log"], "a") as log:
log.write(f"{task['product_name']}|{url}|{task['tokens']}\n")
progress_callback(f"Downloaded: {filename}")
except (httpx.RequestError, httpx.HTTPStatusError, ValueError, IOError) as e:
with open(config["failed_log"], "a") as log:
log.write(f"{url}|{filename}|{e}\n")
progress_callback(f"Failed: {filename}")
if os.path.exists(filename):
os.remove(filename)
spinner = itertools.cycle(['|', '/', '-', '\\'])
def update_progress(processed_count, total_count, message):
"""Updates a single line in the console with a spinner animation."""
percent = min(100, (processed_count / total_count) * 100) if total_count > 0 else 0
try:
terminal_width = os.get_terminal_size().columns
except OSError:
terminal_width = 80
stats = f"{next(spinner)} {percent:.1f}% [{processed_count}/{total_count}] "
available_width = terminal_width - len(stats) - 1
if available_width < 10:
available_width = 10
if len(message) > available_width:
message = message[:available_width-3] + "..."
status_line = f"\r{stats}{message.ljust(available_width)}\x1b[K"
sys.stdout.write(status_line)
sys.stdout.flush()
async def main():
"""Main function to orchestrate the download process."""
args = parse_arguments()
output_dir = os.path.abspath(args.output)
config = {
"output_dir": output_dir,
"downloads_log": os.path.join(output_dir, "downloads.log"),
"failed_log": os.path.join(output_dir, "failed-downloads.log"),
"base_url": BASE_URL
}
if not os.path.exists(config["output_dir"]):
try:
os.makedirs(config["output_dir"])
print(f"Created output directory: {config['output_dir']}")
except OSError as e:
print(f"Error creating output directory '{config['output_dir']}': {e}", file=sys.stderr)
sys.exit(1)
open(config["downloads_log"], 'a').close()
open(config["failed_log"], 'a').close()
print(f"Output directory: {config['output_dir']}")
print(f"Downloads will be organized in company-specific folders")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
}
async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client:
tasks_to_run = await get_download_tasks(client, config)
if not tasks_to_run:
print("No download tasks found. Exiting.")
return
total_tasks = len(tasks_to_run)
print(f"Found {total_tasks} files to potentially download.")
processed_count = 0
def progress_callback(message):
update_progress(processed_count, total_tasks, message)
semaphore = asyncio.Semaphore(CONCURRENT_DOWNLOADS)
async def worker(task):
nonlocal processed_count
async with semaphore:
await download_file(client, task, config, progress_callback)
processed_count += 1
completed_message = f"Completed: {os.path.basename(task['filename'])}"
update_progress(processed_count, total_tasks, completed_message)
download_coroutines = [worker(task) for task in tasks_to_run]
await asyncio.gather(*download_coroutines)
for task in tasks_to_run:
folder = task.get("folder_name")
if folder and os.path.isdir(folder):
if not any(os.scandir(folder)):
try:
os.rmdir(folder)
print(f"Removed empty folder: {folder}")
except Exception as e:
print(f"Failed to remove empty folder {folder}: {e}")
print("\n\nDownload process finished.")
print(f"Files downloaded to: {config['output_dir']}")
print(f"Successful downloads logged in: {config['downloads_log']}")
print(f"Failed downloads logged in: {config['failed_log']}")
if __name__ == "__main__":
try:
print("Dependencies check passed. Running the download script...")
except ImportError as e:
print(f"Missing dependency: {e}")
print("Install missing packages with: pip install httpx beautifulsoup4")
sys.exit(1)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment