|
#!/usr/bin/env python3 |
|
import os |
|
import sys |
|
import shutil |
|
import subprocess |
|
from pathlib import Path |
|
from urllib.request import urlopen, Request |
|
from urllib.error import URLError, HTTPError |
|
from urllib.parse import urljoin, urlparse, unquote, quote |
|
from bs4 import BeautifulSoup |
|
from typing import List, Tuple |
|
from time import sleep |
|
|
|
|
|
class SimpleFetcher: |
|
def __init__(self, headers=None): |
|
self.headers = headers or { |
|
"User-Agent": "Wget/1.24.5", |
|
"Accept": "*/*", |
|
"Accept-Encoding": "identity", |
|
} |
|
|
|
def fetch_url(self, url): |
|
"""Fetch a URL with the default headers and return raw bytes.""" |
|
req = Request(url, headers=self.headers) |
|
try: |
|
with urlopen(req) as resp: |
|
return resp.read() |
|
except HTTPError as e: |
|
print(f"[HTTP ERROR] {e.code} - {e.reason} for {url}") |
|
except URLError as e: |
|
print(f"[URL ERROR] {e.reason} for {url}") |
|
return None |
|
|
|
|
|
class DirectoryDownloader: |
|
def __init__(self, use_aria2: bool = True): |
|
self.download_queue = [] |
|
self.use_aria2 = use_aria2 and self._check_aria2() |
|
self.temp_suffix = '.tmp' |
|
self.fetcher = SimpleFetcher() |
|
|
|
def _check_aria2(self) -> bool: |
|
"""Check if aria2c is available on the system""" |
|
return shutil.which('aria2c') is not None |
|
|
|
def clean_file_name(self, filename) -> str: |
|
# Characters that are problematic on various filesystems |
|
invalid_chars = '<>:"/\\|?*' |
|
|
|
# Replace invalid characters with underscores |
|
for char in invalid_chars: |
|
filename = filename.replace(char, '_') |
|
|
|
return filename |
|
|
|
def get_links(self, url: str, file_location: str, server_root_parts=None) -> None: |
|
"""Recursively crawl directory listings and build download queue. |
|
|
|
This version: |
|
- On the first call captures (scheme, netloc, base_path) so crawling is |
|
restricted to the starting subtree (prevents climbing to '/'). |
|
- Tracks visited URLs to avoid revisits. |
|
- Normalizes hrefs for local filesystem usage. |
|
""" |
|
try: |
|
# Initialize visited set on first use (keeps this change self-contained) |
|
if not hasattr(self, "_visited_urls"): |
|
self._visited_urls = set() |
|
|
|
parsed = urlparse(url) |
|
|
|
# Normalize the URL (remove query/fragment) for visited-check |
|
norm_url = parsed._replace(query="", fragment="").geturl() |
|
if norm_url in self._visited_urls: |
|
return |
|
self._visited_urls.add(norm_url) |
|
|
|
print(f"Crawling: {url}") |
|
|
|
# On initial invocation capture the server root + base path (directory subtree) |
|
if server_root_parts is None: |
|
# Determine a base path that represents the starting directory. |
|
# If the provided URL is a directory it should already end with '/', |
|
# otherwise derive the directory portion. |
|
path = parsed.path or "/" |
|
if not path.endswith("/"): |
|
if "/" in path: |
|
path = path[: path.rfind("/") + 1] |
|
else: |
|
path = "/" |
|
# Ensure base_path always ends with a slash for startswith checks |
|
base_path = path if path.endswith("/") else path + "/" |
|
server_root_parts = (parsed.scheme, parsed.netloc, base_path) |
|
|
|
html = self.fetcher.fetch_url(url) |
|
if not html: |
|
return |
|
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
# Ensure directory exists |
|
os.makedirs(file_location, exist_ok=True) |
|
|
|
for link in soup.find_all("a"): |
|
link_rel = link.get("href") |
|
if link_rel: |
|
# Percent-encode spaces and other unsafe chars in the path part |
|
# This keeps already-encoded %xx sequences intact |
|
link_rel = quote(link_rel, safe="/%#?=&;:@") |
|
else: |
|
continue |
|
|
|
# Skip parent directory references (explicit) |
|
if link_rel.startswith("../"): |
|
continue |
|
|
|
# Skip fragments, javascript:, mailto:, tel:, and other non-http(s) schemes |
|
if link_rel.startswith("#"): |
|
continue |
|
parsed_rel = urlparse(link_rel) |
|
if parsed_rel.scheme and parsed_rel.scheme not in ("http", "https"): |
|
# e.g. javascript:, mailto:, ftp: -> skip |
|
continue |
|
|
|
# Build absolute URL from the href relative to the current page |
|
full_link = urljoin(url, link_rel) |
|
parsed_link = urlparse(full_link) |
|
|
|
# Only follow links on the same host |
|
if (parsed_link.scheme, parsed_link.netloc) != (server_root_parts[0], server_root_parts[1]): |
|
continue |
|
|
|
# Restrict crawling to the starting subtree by checking the path prefix |
|
link_path = parsed_link.path or "/" |
|
base_path = server_root_parts[2] |
|
if not base_path.endswith("/"): |
|
base_path = base_path + "/" |
|
# If the link's path does not start with the base_path, skip it. |
|
if not link_path.startswith(base_path): |
|
continue |
|
|
|
# Normalize the href text for use on local filesystem: |
|
# - strip any leading slash so os.path.join doesn't treat it as absolute |
|
# - preserve inner path segments so directory structure is recreated |
|
decoded_name = unquote(link_rel).lstrip("/") |
|
if decoded_name == "": |
|
# defensive: skip empty names |
|
continue |
|
|
|
# Build local location path for directories (sanitize path components) |
|
comps = [self.clean_file_name(p) for p in decoded_name.split("/") if p] |
|
location_name = os.path.join(file_location, *comps) |
|
|
|
if full_link.endswith("/"): |
|
# Go slow |
|
sleep(0.8) |
|
# Directory — recurse into it |
|
self.get_links(full_link, location_name, server_root_parts) |
|
else: |
|
# File — queue it for download |
|
# Use the final path component as the filename |
|
filename_raw = comps[-1] if comps else os.path.basename(decoded_name) |
|
filename_sanitized = self.clean_file_name(filename_raw) |
|
self.download_queue.append((full_link, file_location, filename_sanitized)) |
|
|
|
except Exception as e: |
|
print(f"Error crawling {url}: {e}") |
|
|
|
|
|
def _file_exists_and_complete(self, filepath: str, url: str) -> bool: |
|
"""Check if file exists and appears to be complete""" |
|
if not os.path.exists(filepath): |
|
return False |
|
|
|
# Basic check - file has some size |
|
if os.path.getsize(filepath) == 0: |
|
return False |
|
|
|
# NOT TODO: Could add more sophisticated checks like: |
|
# - HTTP HEAD request to compare file sizes |
|
# - Checksum verification if available |
|
return True |
|
|
|
def _download_with_aria2(self, url: str, directory: str, filename: str) -> bool: |
|
"""Download file using aria2c""" |
|
temp_file = os.path.join(directory, filename + self.temp_suffix) |
|
final_file = os.path.join(directory, filename) |
|
|
|
# Remove any existing temp file |
|
if os.path.exists(temp_file): |
|
os.remove(temp_file) |
|
|
|
cmd = [ |
|
'aria2c', |
|
'--continue=true', # Resume downloads |
|
'--max-tries=5', # Retry failed downloads |
|
'--retry-wait=3', # Wait between retries |
|
'--timeout=30', # Connection timeout |
|
'--max-connection-per-server=4', # Multiple connections |
|
'--split=4', # Split download into segments |
|
'--dir', directory, # Download directory |
|
'--out', filename + self.temp_suffix, # Output filename |
|
url |
|
] |
|
|
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
# Rename temp file to final name on success |
|
os.rename(temp_file, final_file) |
|
return True |
|
except subprocess.CalledProcessError as e: |
|
print(f"aria2c failed for {filename}: {e.stderr}") |
|
# Clean up temp file on failure |
|
# if os.path.exists(temp_file): |
|
# os.remove(temp_file) |
|
return False |
|
|
|
def _download_with_wget(self, url: str, directory: str, filename: str) -> bool: |
|
"""Download file using wget as fallback""" |
|
temp_file = os.path.join(directory, filename + self.temp_suffix) |
|
final_file = os.path.join(directory, filename) |
|
|
|
# Remove any existing temp file |
|
if os.path.exists(temp_file): |
|
os.remove(temp_file) |
|
|
|
cmd = [ |
|
'wget', |
|
'--continue', # Resume downloads |
|
'--tries=5', # Retry attempts |
|
'--timeout=30', # Timeout |
|
'--directory-prefix', directory, |
|
'--output-document', temp_file, |
|
url |
|
] |
|
|
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
# Rename temp file to final name on success |
|
os.rename(temp_file, final_file) |
|
return True |
|
except subprocess.CalledProcessError as e: |
|
print(f"wget failed for {filename}: {e.stderr}") |
|
# Clean up temp file on failure |
|
# if os.path.exists(temp_file): |
|
# os.remove(temp_file) |
|
return False |
|
|
|
def download_file(self, url: str, directory: str, filename: str) -> bool: |
|
"""Download a single file with resume capability""" |
|
final_file = os.path.join(directory, filename) |
|
|
|
# Skip if file already exists and appears complete |
|
if self._file_exists_and_complete(final_file, url): |
|
print(f"Skipping {filename} (already exists)") |
|
return True |
|
|
|
print(f"Downloading {filename}...") |
|
|
|
# Ensure directory exists |
|
os.makedirs(directory, exist_ok=True) |
|
|
|
# Try aria2 first, fallback to wget |
|
if self.use_aria2: |
|
success = self._download_with_aria2(url, directory, filename) |
|
else: |
|
success = self._download_with_wget(url, directory, filename) |
|
|
|
if success: |
|
print(f"✓ Downloaded {filename}") |
|
else: |
|
print(f"✗ Failed to download {filename}") |
|
|
|
return success |
|
|
|
def start_download(self) -> None: |
|
"""Process the download queue""" |
|
if not self.download_queue: |
|
print("No files to download") |
|
return |
|
|
|
print(f"\nFound {len(self.download_queue)} files to download") |
|
print(f"Using {'aria2c' if self.use_aria2 else 'wget'} for downloads") |
|
|
|
successful = 0 |
|
failed = 0 |
|
|
|
for i, (url, directory, filename) in enumerate(self.download_queue, 1): |
|
print(f"\n[{i}/{len(self.download_queue)}]", end=" ") |
|
|
|
if self.download_file(url, directory, filename): |
|
successful += 1 |
|
else: |
|
failed += 1 |
|
|
|
print(f"\n\nDownload Summary:") |
|
print(f"Successful: {successful}") |
|
print(f"Failed: {failed}") |
|
print(f"Total: {len(self.download_queue)}") |
|
|
|
def clean_temp_files(self, base_directory: str) -> None: |
|
"""Clean up any leftover temporary files""" |
|
temp_files = [] |
|
for root, dirs, files in os.walk(base_directory): |
|
for file in files: |
|
if file.endswith(self.temp_suffix): |
|
temp_files.append(os.path.join(root, file)) |
|
|
|
if temp_files: |
|
print(f"\nCleaning up {len(temp_files)} temporary files...") |
|
for temp_file in temp_files: |
|
try: |
|
os.remove(temp_file) |
|
print(f"Removed: {temp_file}") |
|
except OSError as e: |
|
print(f"Could not remove {temp_file}: {e}") |
|
|
|
|
|
def main(): |
|
print("Enhanced Directory Downloader") |
|
print("============================") |
|
|
|
# Check for aria2 |
|
downloader = DirectoryDownloader() |
|
if downloader.use_aria2: |
|
print("✓ aria2c found - will use for faster downloads") |
|
else: |
|
print("! aria2c not found - falling back to wget") |
|
if not shutil.which('wget'): |
|
print("✗ wget also not found! Please install aria2c or wget") |
|
sys.exit(1) |
|
|
|
url = sys.argv[1] |
|
location = os.path.abspath(".") |
|
|
|
try: |
|
# Clean up any existing temp files first |
|
if os.path.exists(location): |
|
downloader.clean_temp_files(location) |
|
|
|
# Crawl the directory structure |
|
print(f"\nCrawling directory structure from: {url}") |
|
downloader.get_links(url, location) |
|
|
|
if not downloader.download_queue: |
|
print("No files found to download") |
|
return |
|
|
|
# Show what will be downloaded |
|
print(f"\nFiles to download:") |
|
for i, (url, directory, filename) in enumerate(downloader.download_queue[:10], 1): |
|
print(f" {i}. {filename}") |
|
# if len(downloader.download_queue) > 10: |
|
# print(f" ... and {len(downloader.download_queue) - 10} more files") |
|
|
|
# Start downloads |
|
downloader.start_download() |
|
|
|
except KeyboardInterrupt: |
|
print("\n\nDownload interrupted by user") |
|
print("Temporary files will be cleaned up on next run") |
|
except Exception as e: |
|
print(f"\nError: {e}") |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |