Skip to content

Instantly share code, notes, and snippets.

@zvodd
Last active August 11, 2025 07:47
Show Gist options
  • Save zvodd/0a60651e73342cda3545a56f66bb324e to your computer and use it in GitHub Desktop.
Save zvodd/0a60651e73342cda3545a56f66bb324e to your computer and use it in GitHub Desktop.
Pretty good "Index Of" Downloader

Build:

docker build -t indexof_downloader:latest .

Run

docker run --rm -v "$PWD":/out indexof_downloader:latest "https://example.com/path"
FROM python:3.12-alpine
ENV PYTHONUNBUFFERED=1
RUN apk add --no-cache bash wget aria2
RUN pip install --no-cache-dir beautifulsoup4
COPY . /usr/src/indexof_downloader
WORKDIR /out
ENTRYPOINT ["python", "-u", "/usr/src/indexof_downloader/indexof_downloader.py"]
#!/usr/bin/env python3
from http.server import HTTPServer, SimpleHTTPRequestHandler
class VerboseHandler(SimpleHTTPRequestHandler):
def do_GET(self):
print("\n=== Incoming Request Headers ===")
for header, value in self.headers.items():
print(f"{header}: {value}")
print("================================\n")
super().do_GET() # serve the file/directory as normal
def do_POST(self):
print("\n=== Incoming Request Headers ===")
for header, value in self.headers.items():
print(f"{header}: {value}")
print("================================\n")
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length) if length else b''
print("=== Body ===")
print(body.decode(errors="replace"))
print("============\n")
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK\n")
if __name__ == "__main__":
port = 8000
httpd = HTTPServer(("", port), VerboseHandler)
print(f"Serving on port {port} (Ctrl+C to stop)")
httpd.serve_forever()
#!/usr/bin/env python3
import os
import sys
import shutil
import subprocess
from pathlib import Path
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from urllib.parse import urljoin, urlparse, unquote, quote
from bs4 import BeautifulSoup
from typing import List, Tuple
from time import sleep
class SimpleFetcher:
def __init__(self, headers=None):
self.headers = headers or {
"User-Agent": "Wget/1.24.5",
"Accept": "*/*",
"Accept-Encoding": "identity",
}
def fetch_url(self, url):
"""Fetch a URL with the default headers and return raw bytes."""
req = Request(url, headers=self.headers)
try:
with urlopen(req) as resp:
return resp.read()
except HTTPError as e:
print(f"[HTTP ERROR] {e.code} - {e.reason} for {url}")
except URLError as e:
print(f"[URL ERROR] {e.reason} for {url}")
return None
class DirectoryDownloader:
def __init__(self, use_aria2: bool = True):
self.download_queue = []
self.use_aria2 = use_aria2 and self._check_aria2()
self.temp_suffix = '.tmp'
self.fetcher = SimpleFetcher()
def _check_aria2(self) -> bool:
"""Check if aria2c is available on the system"""
return shutil.which('aria2c') is not None
def clean_file_name(self, filename) -> str:
# Characters that are problematic on various filesystems
invalid_chars = '<>:"/\\|?*'
# Replace invalid characters with underscores
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename
def get_links(self, url: str, file_location: str, server_root_parts=None) -> None:
"""Recursively crawl directory listings and build download queue.
This version:
- On the first call captures (scheme, netloc, base_path) so crawling is
restricted to the starting subtree (prevents climbing to '/').
- Tracks visited URLs to avoid revisits.
- Normalizes hrefs for local filesystem usage.
"""
try:
# Initialize visited set on first use (keeps this change self-contained)
if not hasattr(self, "_visited_urls"):
self._visited_urls = set()
parsed = urlparse(url)
# Normalize the URL (remove query/fragment) for visited-check
norm_url = parsed._replace(query="", fragment="").geturl()
if norm_url in self._visited_urls:
return
self._visited_urls.add(norm_url)
print(f"Crawling: {url}")
# On initial invocation capture the server root + base path (directory subtree)
if server_root_parts is None:
# Determine a base path that represents the starting directory.
# If the provided URL is a directory it should already end with '/',
# otherwise derive the directory portion.
path = parsed.path or "/"
if not path.endswith("/"):
if "/" in path:
path = path[: path.rfind("/") + 1]
else:
path = "/"
# Ensure base_path always ends with a slash for startswith checks
base_path = path if path.endswith("/") else path + "/"
server_root_parts = (parsed.scheme, parsed.netloc, base_path)
html = self.fetcher.fetch_url(url)
if not html:
return
soup = BeautifulSoup(html, "html.parser")
# Ensure directory exists
os.makedirs(file_location, exist_ok=True)
for link in soup.find_all("a"):
link_rel = link.get("href")
if link_rel:
# Percent-encode spaces and other unsafe chars in the path part
# This keeps already-encoded %xx sequences intact
link_rel = quote(link_rel, safe="/%#?=&;:@")
else:
continue
# Skip parent directory references (explicit)
if link_rel.startswith("../"):
continue
# Skip fragments, javascript:, mailto:, tel:, and other non-http(s) schemes
if link_rel.startswith("#"):
continue
parsed_rel = urlparse(link_rel)
if parsed_rel.scheme and parsed_rel.scheme not in ("http", "https"):
# e.g. javascript:, mailto:, ftp: -> skip
continue
# Build absolute URL from the href relative to the current page
full_link = urljoin(url, link_rel)
parsed_link = urlparse(full_link)
# Only follow links on the same host
if (parsed_link.scheme, parsed_link.netloc) != (server_root_parts[0], server_root_parts[1]):
continue
# Restrict crawling to the starting subtree by checking the path prefix
link_path = parsed_link.path or "/"
base_path = server_root_parts[2]
if not base_path.endswith("/"):
base_path = base_path + "/"
# If the link's path does not start with the base_path, skip it.
if not link_path.startswith(base_path):
continue
# Normalize the href text for use on local filesystem:
# - strip any leading slash so os.path.join doesn't treat it as absolute
# - preserve inner path segments so directory structure is recreated
decoded_name = unquote(link_rel).lstrip("/")
if decoded_name == "":
# defensive: skip empty names
continue
# Build local location path for directories (sanitize path components)
comps = [self.clean_file_name(p) for p in decoded_name.split("/") if p]
location_name = os.path.join(file_location, *comps)
if full_link.endswith("/"):
# Go slow
sleep(0.8)
# Directory — recurse into it
self.get_links(full_link, location_name, server_root_parts)
else:
# File — queue it for download
# Use the final path component as the filename
filename_raw = comps[-1] if comps else os.path.basename(decoded_name)
filename_sanitized = self.clean_file_name(filename_raw)
self.download_queue.append((full_link, file_location, filename_sanitized))
except Exception as e:
print(f"Error crawling {url}: {e}")
def _file_exists_and_complete(self, filepath: str, url: str) -> bool:
"""Check if file exists and appears to be complete"""
if not os.path.exists(filepath):
return False
# Basic check - file has some size
if os.path.getsize(filepath) == 0:
return False
# NOT TODO: Could add more sophisticated checks like:
# - HTTP HEAD request to compare file sizes
# - Checksum verification if available
return True
def _download_with_aria2(self, url: str, directory: str, filename: str) -> bool:
"""Download file using aria2c"""
temp_file = os.path.join(directory, filename + self.temp_suffix)
final_file = os.path.join(directory, filename)
# Remove any existing temp file
if os.path.exists(temp_file):
os.remove(temp_file)
cmd = [
'aria2c',
'--continue=true', # Resume downloads
'--max-tries=5', # Retry failed downloads
'--retry-wait=3', # Wait between retries
'--timeout=30', # Connection timeout
'--max-connection-per-server=4', # Multiple connections
'--split=4', # Split download into segments
'--dir', directory, # Download directory
'--out', filename + self.temp_suffix, # Output filename
url
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Rename temp file to final name on success
os.rename(temp_file, final_file)
return True
except subprocess.CalledProcessError as e:
print(f"aria2c failed for {filename}: {e.stderr}")
# Clean up temp file on failure
# if os.path.exists(temp_file):
# os.remove(temp_file)
return False
def _download_with_wget(self, url: str, directory: str, filename: str) -> bool:
"""Download file using wget as fallback"""
temp_file = os.path.join(directory, filename + self.temp_suffix)
final_file = os.path.join(directory, filename)
# Remove any existing temp file
if os.path.exists(temp_file):
os.remove(temp_file)
cmd = [
'wget',
'--continue', # Resume downloads
'--tries=5', # Retry attempts
'--timeout=30', # Timeout
'--directory-prefix', directory,
'--output-document', temp_file,
url
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Rename temp file to final name on success
os.rename(temp_file, final_file)
return True
except subprocess.CalledProcessError as e:
print(f"wget failed for {filename}: {e.stderr}")
# Clean up temp file on failure
# if os.path.exists(temp_file):
# os.remove(temp_file)
return False
def download_file(self, url: str, directory: str, filename: str) -> bool:
"""Download a single file with resume capability"""
final_file = os.path.join(directory, filename)
# Skip if file already exists and appears complete
if self._file_exists_and_complete(final_file, url):
print(f"Skipping {filename} (already exists)")
return True
print(f"Downloading {filename}...")
# Ensure directory exists
os.makedirs(directory, exist_ok=True)
# Try aria2 first, fallback to wget
if self.use_aria2:
success = self._download_with_aria2(url, directory, filename)
else:
success = self._download_with_wget(url, directory, filename)
if success:
print(f"✓ Downloaded {filename}")
else:
print(f"✗ Failed to download {filename}")
return success
def start_download(self) -> None:
"""Process the download queue"""
if not self.download_queue:
print("No files to download")
return
print(f"\nFound {len(self.download_queue)} files to download")
print(f"Using {'aria2c' if self.use_aria2 else 'wget'} for downloads")
successful = 0
failed = 0
for i, (url, directory, filename) in enumerate(self.download_queue, 1):
print(f"\n[{i}/{len(self.download_queue)}]", end=" ")
if self.download_file(url, directory, filename):
successful += 1
else:
failed += 1
print(f"\n\nDownload Summary:")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
print(f"Total: {len(self.download_queue)}")
def clean_temp_files(self, base_directory: str) -> None:
"""Clean up any leftover temporary files"""
temp_files = []
for root, dirs, files in os.walk(base_directory):
for file in files:
if file.endswith(self.temp_suffix):
temp_files.append(os.path.join(root, file))
if temp_files:
print(f"\nCleaning up {len(temp_files)} temporary files...")
for temp_file in temp_files:
try:
os.remove(temp_file)
print(f"Removed: {temp_file}")
except OSError as e:
print(f"Could not remove {temp_file}: {e}")
def main():
print("Enhanced Directory Downloader")
print("============================")
# Check for aria2
downloader = DirectoryDownloader()
if downloader.use_aria2:
print("✓ aria2c found - will use for faster downloads")
else:
print("! aria2c not found - falling back to wget")
if not shutil.which('wget'):
print("✗ wget also not found! Please install aria2c or wget")
sys.exit(1)
url = sys.argv[1]
location = os.path.abspath(".")
try:
# Clean up any existing temp files first
if os.path.exists(location):
downloader.clean_temp_files(location)
# Crawl the directory structure
print(f"\nCrawling directory structure from: {url}")
downloader.get_links(url, location)
if not downloader.download_queue:
print("No files found to download")
return
# Show what will be downloaded
print(f"\nFiles to download:")
for i, (url, directory, filename) in enumerate(downloader.download_queue[:10], 1):
print(f" {i}. {filename}")
# if len(downloader.download_queue) > 10:
# print(f" ... and {len(downloader.download_queue) - 10} more files")
# Start downloads
downloader.start_download()
except KeyboardInterrupt:
print("\n\nDownload interrupted by user")
print("Temporary files will be cleaned up on next run")
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment