Created
October 27, 2025 03:31
-
-
Save rubenvarela/427ce05a54f6a4cb94365a9dd237b1f1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| from urllib.parse import urljoin, quote, unquote | |
| from pathlib import Path | |
| import xml.etree.ElementTree as ET | |
| from typing import List, Dict, Optional | |
| import time | |
| import argparse | |
| class S3BucketDownloader: | |
| def __init__(self, bucket_url: str, root_dir: str = '', local_dir: str = 'downloaded', | |
| max_files: Optional[int] = None, skip_existing: bool = True): | |
| """ | |
| Initialize S3 bucket downloader using public HTTP access. | |
| Args: | |
| bucket_url: Base URL of the S3 bucket (e.g., 'https://bucket-name.s3.amazonaws.com') | |
| root_dir: Root directory within bucket to start from | |
| local_dir: Local directory to save files | |
| max_files: Maximum number of files to download (None = unlimited) | |
| skip_existing: Skip files that already exist locally | |
| """ | |
| self.bucket_url = bucket_url.rstrip('/') | |
| self.root_dir = root_dir | |
| self.local_dir = local_dir | |
| self.max_files = max_files | |
| self.skip_existing = skip_existing | |
| self.files_downloaded = 0 | |
| Path(local_dir).mkdir(parents=True, exist_ok=True) | |
| def create_s3_query_url(self, prefix: str = '', marker: Optional[str] = None) -> str: | |
| """ | |
| Create S3 REST API query URL (mimics createS3QueryUrl from JS). | |
| """ | |
| url = f"{self.bucket_url}?delimiter=/" | |
| if prefix: | |
| # Ensure prefix ends with / | |
| prefix = prefix.rstrip('/') + '/' | |
| url += f"&prefix={quote(prefix)}" | |
| if marker: | |
| url += f"&marker={quote(marker)}" | |
| return url | |
| def get_s3_data(self, prefix: str = '', marker: Optional[str] = None) -> Dict: | |
| """ | |
| Fetch S3 bucket listing data with retry logic (mimics getS3Data from JS). | |
| """ | |
| url = self.create_s3_query_url(prefix, marker) | |
| print(f"Fetching: {url}") | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| return self.parse_s3_xml(response.content) | |
| except requests.exceptions.RequestException as e: | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt # Exponential backoff | |
| print(f" ⚠ Error (attempt {attempt + 1}/{max_retries}): {e}") | |
| print(f" ⏳ Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| print(f" ✗ Failed after {max_retries} attempts: {e}") | |
| raise | |
| def parse_s3_xml(self, xml_content: bytes) -> Dict: | |
| """ | |
| Parse S3 XML response with error handling (mimics getInfoFromS3Data from JS). | |
| """ | |
| try: | |
| root = ET.fromstring(xml_content) | |
| except ET.ParseError as e: | |
| print(f" ✗ XML Parse Error: {e}") | |
| print(f" 📄 Response preview: {xml_content[:500]}") | |
| raise | |
| # Define namespace | |
| ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'} | |
| # Extract files | |
| files = [] | |
| for content in root.findall('s3:Contents', ns): | |
| try: | |
| key_elem = content.find('s3:Key', ns) | |
| last_modified_elem = content.find('s3:LastModified', ns) | |
| size_elem = content.find('s3:Size', ns) | |
| if key_elem is not None and key_elem.text: | |
| files.append({ | |
| 'Key': key_elem.text, | |
| 'LastModified': last_modified_elem.text if last_modified_elem is not None else '', | |
| 'Size': int(size_elem.text) if size_elem is not None and size_elem.text else 0, | |
| 'Type': 'file' | |
| }) | |
| except (ValueError, AttributeError) as e: | |
| print(f" ⚠ Warning: Skipping malformed file entry: {e}") | |
| continue | |
| # Extract directories | |
| directories = [] | |
| for prefix_elem in root.findall('s3:CommonPrefixes', ns): | |
| prefix = prefix_elem.find('s3:Prefix', ns) | |
| if prefix is not None and prefix.text: | |
| directories.append({ | |
| 'Key': prefix.text, | |
| 'LastModified': '', | |
| 'Size': 0, | |
| 'Type': 'directory' | |
| }) | |
| # Check for truncation (pagination) | |
| is_truncated_elem = root.find('s3:IsTruncated', ns) | |
| is_truncated = is_truncated_elem.text.lower() == 'true' if is_truncated_elem is not None else False | |
| next_marker = None | |
| if is_truncated: | |
| next_marker_elem = root.find('s3:NextMarker', ns) | |
| if next_marker_elem is not None: | |
| next_marker = next_marker_elem.text | |
| prefix_elem = root.find('s3:Prefix', ns) | |
| current_prefix = prefix_elem.text if prefix_elem is not None else '' | |
| return { | |
| 'files': files, | |
| 'directories': directories, | |
| 'prefix': current_prefix, | |
| 'nextMarker': next_marker | |
| } | |
| def download_file(self, key: str) -> bool: | |
| """ | |
| Download a single file from S3. | |
| Returns: | |
| True if file was downloaded, False if skipped or failed | |
| """ | |
| # Check max_files limit | |
| if self.max_files and self.files_downloaded >= self.max_files: | |
| return False | |
| # Create local file path | |
| local_path = Path(self.local_dir) / key | |
| local_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Skip if file already exists and skip_existing is True | |
| if self.skip_existing and local_path.exists(): | |
| print(f"⊘ Skipping (exists): {key}") | |
| return False | |
| print(f"Downloading: {key}") | |
| # Construct download URL | |
| file_url = f"{self.bucket_url}/{quote(key, safe='/')}" | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(file_url, stream=True, timeout=60) | |
| response.raise_for_status() | |
| with open(local_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| file_size = bytes_to_human_readable(local_path.stat().st_size) | |
| print(f" ✓ Saved: {local_path} ({file_size})") | |
| self.files_downloaded += 1 | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt | |
| print(f" ⚠ Error (attempt {attempt + 1}/{max_retries}): {e}") | |
| print(f" ⏳ Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| print(f" ✗ Failed after {max_retries} attempts: {e}") | |
| return False | |
| except Exception as e: | |
| print(f" ✗ Unexpected error: {e}") | |
| return False | |
| def download_directory(self, prefix: str = '', marker: Optional[str] = None) -> None: | |
| """ | |
| Recursively download all files in a directory (mimics the recursive getS3Data). | |
| """ | |
| # Check if we've reached max_files limit | |
| if self.max_files and self.files_downloaded >= self.max_files: | |
| print(f"\n⚠ Reached maximum file limit ({self.max_files}). Stopping download.") | |
| return | |
| try: | |
| data = self.get_s3_data(prefix, marker) | |
| except Exception as e: | |
| print(f"⚠ Skipping directory due to error: {e}") | |
| return | |
| # Download all files in current directory | |
| for file_info in data['files']: | |
| if not self.download_file(file_info['Key']): | |
| # If download_file returns False due to max_files, stop | |
| if self.max_files and self.files_downloaded >= self.max_files: | |
| return | |
| # Check limit before processing subdirectories | |
| if self.max_files and self.files_downloaded >= self.max_files: | |
| return | |
| # Recursively download subdirectories | |
| for dir_info in data['directories']: | |
| self.download_directory(dir_info['Key']) | |
| # Check limit after each subdirectory | |
| if self.max_files and self.files_downloaded >= self.max_files: | |
| return | |
| # Handle pagination (if response was truncated) | |
| if data['nextMarker']: | |
| print(f"📄 Fetching next page (marker: {data['nextMarker']})") | |
| self.download_directory(prefix, data['nextMarker']) | |
| def download_all(self) -> None: | |
| """ | |
| Download entire bucket contents starting from root_dir. | |
| """ | |
| print(f"🚀 Starting download from: {self.bucket_url}") | |
| print(f"📁 Saving to: {self.local_dir}") | |
| if self.root_dir: | |
| print(f"📂 Root directory: {self.root_dir}") | |
| if self.max_files: | |
| print(f"🔢 Maximum files: {self.max_files}") | |
| print(f"⏭ Skip existing: {self.skip_existing}") | |
| print("=" * 60) | |
| start_time = time.time() | |
| self.download_directory(self.root_dir) | |
| elapsed = time.time() - start_time | |
| print("=" * 60) | |
| print(f"✅ Download complete!") | |
| print(f"📊 Files downloaded: {self.files_downloaded}") | |
| print(f"⏱ Time elapsed: {elapsed:.2f}s") | |
| def bytes_to_human_readable(size_bytes: int) -> str: | |
| """ | |
| Convert bytes to human readable format (mimics bytesToHumanReadable from JS). | |
| """ | |
| units = ['kB', 'MB', 'GB'] | |
| size = size_bytes | |
| unit_index = -1 | |
| while size > 1024 and unit_index < len(units) - 1: | |
| size = size / 1024 | |
| unit_index += 1 | |
| if unit_index == -1: | |
| return f"{size_bytes} B" | |
| return f"{max(size, 0.1):.1f} {units[unit_index]}" | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description='Download files from public S3 buckets (YFCC100M dataset)', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=''' | |
| Examples: | |
| # Download first 10 files | |
| python download_s3.py --max-files 10 | |
| # Download from specific directory | |
| python download_s3.py --root-dir data/images/001/ --max-files 5 | |
| # Download all (WARNING: This is 100M images!) | |
| python download_s3.py --max-files 0 | |
| # Resume download (skip existing files) | |
| python download_s3.py --max-files 100 --skip-existing | |
| ''' | |
| ) | |
| parser.add_argument('--bucket-url', | |
| default='https://multimedia-commons.s3.us-west-2.amazonaws.com', | |
| help='S3 bucket URL (default: YFCC100M bucket)') | |
| parser.add_argument('--root-dir', | |
| default='', | |
| help='Root directory within bucket to start from (default: root)') | |
| parser.add_argument('--local-dir', | |
| default='downloaded_bucket', | |
| help='Local directory to save files (default: downloaded_bucket)') | |
| parser.add_argument('--max-files', | |
| type=int, | |
| default=10, | |
| help='Maximum number of files to download (0=unlimited, default: 10)') | |
| parser.add_argument('--no-skip-existing', | |
| action='store_true', | |
| help='Re-download files that already exist locally') | |
| args = parser.parse_args() | |
| # Convert max_files=0 to None (unlimited) | |
| max_files = None if args.max_files == 0 else args.max_files | |
| skip_existing = not args.no_skip_existing | |
| # Create downloader and start | |
| downloader = S3BucketDownloader( | |
| bucket_url=args.bucket_url, | |
| root_dir=args.root_dir, | |
| local_dir=args.local_dir, | |
| max_files=max_files, | |
| skip_existing=skip_existing | |
| ) | |
| try: | |
| downloader.download_all() | |
| except KeyboardInterrupt: | |
| print("\n\n⚠ Download interrupted by user") | |
| print(f"📊 Files downloaded before interruption: {downloader.files_downloaded}") | |
| except Exception as e: | |
| print(f"\n\n✗ Fatal error: {e}") | |
| print(f"📊 Files downloaded before error: {downloader.files_downloaded}") | |
| raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment