Skip to content

Instantly share code, notes, and snippets.

@rubenvarela
Created October 27, 2025 03:31
Show Gist options
  • Save rubenvarela/427ce05a54f6a4cb94365a9dd237b1f1 to your computer and use it in GitHub Desktop.
Save rubenvarela/427ce05a54f6a4cb94365a9dd237b1f1 to your computer and use it in GitHub Desktop.
import requests
from urllib.parse import urljoin, quote, unquote
from pathlib import Path
import xml.etree.ElementTree as ET
from typing import List, Dict, Optional
import time
import argparse
class S3BucketDownloader:
def __init__(self, bucket_url: str, root_dir: str = '', local_dir: str = 'downloaded',
max_files: Optional[int] = None, skip_existing: bool = True):
"""
Initialize S3 bucket downloader using public HTTP access.
Args:
bucket_url: Base URL of the S3 bucket (e.g., 'https://bucket-name.s3.amazonaws.com')
root_dir: Root directory within bucket to start from
local_dir: Local directory to save files
max_files: Maximum number of files to download (None = unlimited)
skip_existing: Skip files that already exist locally
"""
self.bucket_url = bucket_url.rstrip('/')
self.root_dir = root_dir
self.local_dir = local_dir
self.max_files = max_files
self.skip_existing = skip_existing
self.files_downloaded = 0
Path(local_dir).mkdir(parents=True, exist_ok=True)
def create_s3_query_url(self, prefix: str = '', marker: Optional[str] = None) -> str:
"""
Create S3 REST API query URL (mimics createS3QueryUrl from JS).
"""
url = f"{self.bucket_url}?delimiter=/"
if prefix:
# Ensure prefix ends with /
prefix = prefix.rstrip('/') + '/'
url += f"&prefix={quote(prefix)}"
if marker:
url += f"&marker={quote(marker)}"
return url
def get_s3_data(self, prefix: str = '', marker: Optional[str] = None) -> Dict:
"""
Fetch S3 bucket listing data with retry logic (mimics getS3Data from JS).
"""
url = self.create_s3_query_url(prefix, marker)
print(f"Fetching: {url}")
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return self.parse_s3_xml(response.content)
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f" ⚠ Error (attempt {attempt + 1}/{max_retries}): {e}")
print(f" ⏳ Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f" ✗ Failed after {max_retries} attempts: {e}")
raise
def parse_s3_xml(self, xml_content: bytes) -> Dict:
"""
Parse S3 XML response with error handling (mimics getInfoFromS3Data from JS).
"""
try:
root = ET.fromstring(xml_content)
except ET.ParseError as e:
print(f" ✗ XML Parse Error: {e}")
print(f" 📄 Response preview: {xml_content[:500]}")
raise
# Define namespace
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
# Extract files
files = []
for content in root.findall('s3:Contents', ns):
try:
key_elem = content.find('s3:Key', ns)
last_modified_elem = content.find('s3:LastModified', ns)
size_elem = content.find('s3:Size', ns)
if key_elem is not None and key_elem.text:
files.append({
'Key': key_elem.text,
'LastModified': last_modified_elem.text if last_modified_elem is not None else '',
'Size': int(size_elem.text) if size_elem is not None and size_elem.text else 0,
'Type': 'file'
})
except (ValueError, AttributeError) as e:
print(f" ⚠ Warning: Skipping malformed file entry: {e}")
continue
# Extract directories
directories = []
for prefix_elem in root.findall('s3:CommonPrefixes', ns):
prefix = prefix_elem.find('s3:Prefix', ns)
if prefix is not None and prefix.text:
directories.append({
'Key': prefix.text,
'LastModified': '',
'Size': 0,
'Type': 'directory'
})
# Check for truncation (pagination)
is_truncated_elem = root.find('s3:IsTruncated', ns)
is_truncated = is_truncated_elem.text.lower() == 'true' if is_truncated_elem is not None else False
next_marker = None
if is_truncated:
next_marker_elem = root.find('s3:NextMarker', ns)
if next_marker_elem is not None:
next_marker = next_marker_elem.text
prefix_elem = root.find('s3:Prefix', ns)
current_prefix = prefix_elem.text if prefix_elem is not None else ''
return {
'files': files,
'directories': directories,
'prefix': current_prefix,
'nextMarker': next_marker
}
def download_file(self, key: str) -> bool:
"""
Download a single file from S3.
Returns:
True if file was downloaded, False if skipped or failed
"""
# Check max_files limit
if self.max_files and self.files_downloaded >= self.max_files:
return False
# Create local file path
local_path = Path(self.local_dir) / key
local_path.parent.mkdir(parents=True, exist_ok=True)
# Skip if file already exists and skip_existing is True
if self.skip_existing and local_path.exists():
print(f"⊘ Skipping (exists): {key}")
return False
print(f"Downloading: {key}")
# Construct download URL
file_url = f"{self.bucket_url}/{quote(key, safe='/')}"
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.get(file_url, stream=True, timeout=60)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
file_size = bytes_to_human_readable(local_path.stat().st_size)
print(f" ✓ Saved: {local_path} ({file_size})")
self.files_downloaded += 1
return True
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f" ⚠ Error (attempt {attempt + 1}/{max_retries}): {e}")
print(f" ⏳ Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f" ✗ Failed after {max_retries} attempts: {e}")
return False
except Exception as e:
print(f" ✗ Unexpected error: {e}")
return False
def download_directory(self, prefix: str = '', marker: Optional[str] = None) -> None:
"""
Recursively download all files in a directory (mimics the recursive getS3Data).
"""
# Check if we've reached max_files limit
if self.max_files and self.files_downloaded >= self.max_files:
print(f"\n⚠ Reached maximum file limit ({self.max_files}). Stopping download.")
return
try:
data = self.get_s3_data(prefix, marker)
except Exception as e:
print(f"⚠ Skipping directory due to error: {e}")
return
# Download all files in current directory
for file_info in data['files']:
if not self.download_file(file_info['Key']):
# If download_file returns False due to max_files, stop
if self.max_files and self.files_downloaded >= self.max_files:
return
# Check limit before processing subdirectories
if self.max_files and self.files_downloaded >= self.max_files:
return
# Recursively download subdirectories
for dir_info in data['directories']:
self.download_directory(dir_info['Key'])
# Check limit after each subdirectory
if self.max_files and self.files_downloaded >= self.max_files:
return
# Handle pagination (if response was truncated)
if data['nextMarker']:
print(f"📄 Fetching next page (marker: {data['nextMarker']})")
self.download_directory(prefix, data['nextMarker'])
def download_all(self) -> None:
"""
Download entire bucket contents starting from root_dir.
"""
print(f"🚀 Starting download from: {self.bucket_url}")
print(f"📁 Saving to: {self.local_dir}")
if self.root_dir:
print(f"📂 Root directory: {self.root_dir}")
if self.max_files:
print(f"🔢 Maximum files: {self.max_files}")
print(f"⏭ Skip existing: {self.skip_existing}")
print("=" * 60)
start_time = time.time()
self.download_directory(self.root_dir)
elapsed = time.time() - start_time
print("=" * 60)
print(f"✅ Download complete!")
print(f"📊 Files downloaded: {self.files_downloaded}")
print(f"⏱ Time elapsed: {elapsed:.2f}s")
def bytes_to_human_readable(size_bytes: int) -> str:
"""
Convert bytes to human readable format (mimics bytesToHumanReadable from JS).
"""
units = ['kB', 'MB', 'GB']
size = size_bytes
unit_index = -1
while size > 1024 and unit_index < len(units) - 1:
size = size / 1024
unit_index += 1
if unit_index == -1:
return f"{size_bytes} B"
return f"{max(size, 0.1):.1f} {units[unit_index]}"
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Download files from public S3 buckets (YFCC100M dataset)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
# Download first 10 files
python download_s3.py --max-files 10
# Download from specific directory
python download_s3.py --root-dir data/images/001/ --max-files 5
# Download all (WARNING: This is 100M images!)
python download_s3.py --max-files 0
# Resume download (skip existing files)
python download_s3.py --max-files 100 --skip-existing
'''
)
parser.add_argument('--bucket-url',
default='https://multimedia-commons.s3.us-west-2.amazonaws.com',
help='S3 bucket URL (default: YFCC100M bucket)')
parser.add_argument('--root-dir',
default='',
help='Root directory within bucket to start from (default: root)')
parser.add_argument('--local-dir',
default='downloaded_bucket',
help='Local directory to save files (default: downloaded_bucket)')
parser.add_argument('--max-files',
type=int,
default=10,
help='Maximum number of files to download (0=unlimited, default: 10)')
parser.add_argument('--no-skip-existing',
action='store_true',
help='Re-download files that already exist locally')
args = parser.parse_args()
# Convert max_files=0 to None (unlimited)
max_files = None if args.max_files == 0 else args.max_files
skip_existing = not args.no_skip_existing
# Create downloader and start
downloader = S3BucketDownloader(
bucket_url=args.bucket_url,
root_dir=args.root_dir,
local_dir=args.local_dir,
max_files=max_files,
skip_existing=skip_existing
)
try:
downloader.download_all()
except KeyboardInterrupt:
print("\n\n⚠ Download interrupted by user")
print(f"📊 Files downloaded before interruption: {downloader.files_downloaded}")
except Exception as e:
print(f"\n\n✗ Fatal error: {e}")
print(f"📊 Files downloaded before error: {downloader.files_downloaded}")
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment