Last active
June 23, 2025 09:16
-
-
Save hellojahid3/168d8bad8deb9675cd97f9cda5d38940 to your computer and use it in GitHub Desktop.
Python application provides a feature-rich graphical user interface for downloading files from S3-compatible storage services (AWS S3, DigitalOcean Spaces, etc.). It uses boto3 for S3 operations and Tkinter for the GUI.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import boto3 | |
| import os | |
| import logging | |
| import sys | |
| import threading | |
| import tkinter as tk | |
| from tkinter import ttk, filedialog, messagebox | |
| from typing import Optional | |
| from botocore.exceptions import ClientError, NoCredentialsError | |
| from pathlib import Path | |
| import configparser | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[logging.StreamHandler(), logging.FileHandler("s3_downloader.log")] | |
| ) | |
| logger = logging.getLogger("s3_downloader") | |
| class S3Downloader: | |
| """Class to handle downloading files from S3-compatible storage""" | |
| def __init__(self, config: dict): | |
| """Initialize the downloader with configuration""" | |
| self.config = config | |
| self.s3_client = self._initialize_s3_client() | |
| self.total_files = 0 | |
| self.downloaded_files = 0 | |
| self.stop_download = False | |
| def _initialize_s3_client(self): | |
| """Create and return an S3 client""" | |
| try: | |
| # Clean up endpoint URL | |
| endpoint_url = self.config["endpoint_url"].strip() | |
| if not endpoint_url.startswith(('http://', 'https://')): | |
| endpoint_url = 'https://' + endpoint_url | |
| return boto3.client( | |
| 's3', | |
| endpoint_url=endpoint_url, | |
| aws_access_key_id=self.config["access_key"].strip(), | |
| aws_secret_access_key=self.config["secret_key"].strip(), | |
| region_name=self.config.get("region", "us-east-1") | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize S3 client: {e}") | |
| raise | |
| def test_connection(self) -> tuple[bool, str]: | |
| """Test the connection to the S3 service""" | |
| try: | |
| # Check if the specific bucket exists and is accessible | |
| bucket_name = self.config["space_name"].strip() | |
| try: | |
| self.s3_client.head_bucket(Bucket=bucket_name) | |
| logger.info(f"Bucket '{bucket_name}' exists and is accessible") | |
| return True, "Connection successful" | |
| except ClientError as e: | |
| error_code = e.response['Error']['Code'] | |
| if error_code == '404': | |
| return False, f"Bucket '{bucket_name}' does not exist" | |
| elif error_code == '403': | |
| return False, f"Access denied to bucket '{bucket_name}'" | |
| else: | |
| return False, f"Error accessing bucket: {e}" | |
| except NoCredentialsError: | |
| return False, "Invalid credentials" | |
| except ClientError as e: | |
| error_code = e.response['Error']['Code'] | |
| if error_code == 'SignatureDoesNotMatch': | |
| return False, "Invalid access key or secret key" | |
| elif error_code == 'InvalidAccessKeyId': | |
| return False, "Invalid access key ID" | |
| else: | |
| return False, f"Connection error: {e}" | |
| except Exception as e: | |
| return False, f"Unexpected error: {e}" | |
| def count_files(self, space_folder: str = '') -> int: | |
| """Count the number of files in the bucket/folder""" | |
| count = 0 | |
| try: | |
| bucket_name = self.config["space_name"].strip() | |
| prefix = space_folder.strip() | |
| # Remove leading slash from prefix if present | |
| if prefix.startswith('/'): | |
| prefix = prefix[1:] | |
| logger.info(f"Counting files in bucket '{bucket_name}' with prefix '{prefix}'") | |
| paginator = self.s3_client.get_paginator('list_objects_v2') | |
| # Use empty prefix if no folder specified | |
| if prefix: | |
| page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) | |
| else: | |
| page_iterator = paginator.paginate(Bucket=bucket_name) | |
| for page in page_iterator: | |
| if 'Contents' in page: | |
| # Count only files, not directories | |
| count += sum(1 for obj in page['Contents'] if not obj['Key'].endswith('/')) | |
| logger.info(f"Found {count} files") | |
| return count | |
| except ClientError as e: | |
| error_code = e.response['Error']['Code'] | |
| if error_code == 'NoSuchBucket': | |
| logger.error(f"Bucket '{bucket_name}' does not exist") | |
| elif error_code == 'AccessDenied': | |
| logger.error(f"Access denied to bucket '{bucket_name}'") | |
| else: | |
| logger.error(f"Error counting files: {e}") | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Unexpected error counting files: {e}") | |
| return 0 | |
| def download_files(self, space_folder: str = '', max_items: Optional[int] = None, progress_callback=None) -> int: | |
| """ | |
| Download files from the S3-compatible storage | |
| Args: | |
| space_folder: The folder prefix in the bucket to download from | |
| max_items: Maximum number of items to download | |
| progress_callback: Callback function to update progress | |
| Returns: | |
| Number of files downloaded | |
| """ | |
| downloaded_count = 0 | |
| self.stop_download = False | |
| try: | |
| bucket_name = self.config["space_name"].strip() | |
| prefix = space_folder.strip() | |
| # Remove leading slash from prefix if present | |
| if prefix.startswith('/'): | |
| prefix = prefix[1:] | |
| logger.info(f"Starting download from bucket '{bucket_name}' with prefix '{prefix}'") | |
| paginator = self.s3_client.get_paginator('list_objects_v2') | |
| if prefix: | |
| page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) | |
| else: | |
| page_iterator = paginator.paginate(Bucket=bucket_name) | |
| for page in page_iterator: | |
| if self.stop_download: | |
| logger.info("Download stopped by user") | |
| break | |
| if 'Contents' not in page: | |
| logger.info(f"No objects found with prefix: {prefix}") | |
| continue | |
| for obj in page['Contents']: | |
| if self.stop_download: | |
| break | |
| key = obj['Key'] | |
| # If we've reached the max items, stop downloading | |
| if max_items is not None and downloaded_count >= max_items: | |
| return downloaded_count | |
| if key.endswith('/'): | |
| logger.debug(f"Skipping directory entry: {key}") | |
| continue | |
| local_file_path = os.path.join(self.config["local_directory"], key) | |
| local_dir = os.path.dirname(local_file_path) | |
| os.makedirs(local_dir, exist_ok=True) | |
| if os.path.exists(local_file_path) and os.path.getsize(local_file_path) == obj['Size']: | |
| logger.debug(f"File {key} already exists with correct size, skipping") | |
| self.downloaded_files += 1 | |
| if progress_callback: | |
| progress_callback(self.downloaded_files, self.total_files, key) | |
| continue | |
| logger.info(f"Downloading {key}...") | |
| try: | |
| self.s3_client.download_file( | |
| bucket_name, | |
| key, | |
| local_file_path | |
| ) | |
| downloaded_count += 1 | |
| self.downloaded_files += 1 | |
| if progress_callback: | |
| progress_callback(self.downloaded_files, self.total_files, key) | |
| logger.info(f"Downloaded {key} to {local_file_path}") | |
| except ClientError as e: | |
| logger.error(f"Error downloading {key}: {e}") | |
| return downloaded_count | |
| except Exception as e: | |
| logger.error(f"Error during download operation: {e}") | |
| raise | |
| class S3DownloaderApp(tk.Tk): | |
| def __init__(self): | |
| super().__init__() | |
| self.title("S3 Downloader") | |
| self.geometry("600x650") | |
| self.downloader = None | |
| self.download_thread = None | |
| self.create_widgets() | |
| self.load_config() | |
| def create_widgets(self): | |
| # Input frame | |
| input_frame = ttk.LabelFrame(self, text="S3 Configuration") | |
| input_frame.pack(padx=10, pady=10, fill="x") | |
| # Space name | |
| ttk.Label(input_frame, text="Space/Bucket Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5) | |
| self.space_name_var = tk.StringVar() | |
| ttk.Entry(input_frame, textvariable=self.space_name_var, width=40).grid(row=0, column=1, padx=5, pady=5) | |
| # Endpoint URL | |
| ttk.Label(input_frame, text="Endpoint URL: \n(e.g. https://nyc3.digitaloceanspaces.com)").grid(row=1, column=0, sticky="w", padx=5, pady=5) | |
| self.endpoint_url_var = tk.StringVar() | |
| ttk.Entry(input_frame, textvariable=self.endpoint_url_var, width=40).grid(row=1, column=1, padx=5, pady=5) | |
| # Access Key | |
| ttk.Label(input_frame, text="Access Key:").grid(row=2, column=0, sticky="w", padx=5, pady=5) | |
| self.access_key_var = tk.StringVar() | |
| ttk.Entry(input_frame, textvariable=self.access_key_var, width=40).grid(row=2, column=1, padx=5, pady=5) | |
| # Secret Key | |
| ttk.Label(input_frame, text="Secret Key:").grid(row=3, column=0, sticky="w", padx=5, pady=5) | |
| self.secret_key_var = tk.StringVar() | |
| ttk.Entry(input_frame, textvariable=self.secret_key_var, width=40, show="*").grid(row=3, column=1, padx=5, pady=5) | |
| # Folder to download from | |
| ttk.Label(input_frame, text="Remote Folder:\nDefault = /").grid(row=4, column=0, sticky="w", padx=5, pady=5) | |
| self.folder_var = tk.StringVar() | |
| ttk.Entry(input_frame, textvariable=self.folder_var, width=40).grid(row=4, column=1, padx=5, pady=5) | |
| # Local Directory | |
| ttk.Label(input_frame, text="Local Directory:").grid(row=5, column=0, sticky="w", padx=5, pady=5) | |
| self.local_dir_var = tk.StringVar(value=os.path.join(os.path.expanduser("~"), "Downloads")) | |
| local_dir_frame = ttk.Frame(input_frame) | |
| local_dir_frame.grid(row=5, column=1, padx=5, pady=5, sticky="w") | |
| ttk.Entry(local_dir_frame, textvariable=self.local_dir_var, width=30).pack(side="left") | |
| ttk.Button(local_dir_frame, text="Browse", command=self.browse_directory).pack(side="left", padx=5) | |
| # Button frame | |
| button_frame = ttk.Frame(input_frame) | |
| button_frame.grid(row=6, column=0, columnspan=2, pady=10) | |
| # Test connection button | |
| self.test_btn = ttk.Button(button_frame, text="Test Connection", command=self.test_connection) | |
| self.test_btn.pack(side="left", padx=5) | |
| # Save config button | |
| ttk.Button(button_frame, text="Save Configuration", command=self.save_config).pack(side="left", padx=5) | |
| # Download button | |
| self.download_btn = ttk.Button(button_frame, text="Start Download", command=self.start_download) | |
| self.download_btn.pack(side="left", padx=5) | |
| # Progress frame | |
| progress_frame = ttk.LabelFrame(self, text="Download Progress") | |
| progress_frame.pack(padx=10, pady=10, fill="both", expand=True) | |
| # Progress bar | |
| self.progress_var = tk.DoubleVar() | |
| self.progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100) | |
| self.progress_bar.pack(padx=10, pady=10, fill="x") | |
| # Progress label | |
| self.progress_label = ttk.Label(progress_frame, text="Ready to download") | |
| self.progress_label.pack(padx=10, pady=5) | |
| # Current file label | |
| self.current_file_label = ttk.Label(progress_frame, text="") | |
| self.current_file_label.pack(padx=10, pady=5) | |
| # Log frame | |
| log_frame = ttk.LabelFrame(self, text="Download Log") | |
| log_frame.pack(padx=10, pady=10, fill="both", expand=True) | |
| # Log text | |
| self.log_text = tk.Text(log_frame, height=8, width=70) | |
| self.log_text.pack(padx=10, pady=10, fill="both", expand=True) | |
| # Scrollbar for log | |
| scrollbar = ttk.Scrollbar(self.log_text, command=self.log_text.yview) | |
| scrollbar.pack(side="right", fill="y") | |
| self.log_text.config(yscrollcommand=scrollbar.set) | |
| # Add handler to capture log messages | |
| self.log_handler = LogTextHandler(self.log_text) | |
| logger.addHandler(self.log_handler) | |
| def browse_directory(self): | |
| directory = filedialog.askdirectory(initialdir=self.local_dir_var.get()) | |
| if directory: | |
| self.local_dir_var.set(directory) | |
| def test_connection(self): | |
| if not self.validate_inputs(): | |
| return | |
| config = { | |
| "space_name": self.space_name_var.get(), | |
| "endpoint_url": self.endpoint_url_var.get(), | |
| "access_key": self.access_key_var.get(), | |
| "secret_key": self.secret_key_var.get(), | |
| "local_directory": self.local_dir_var.get() | |
| } | |
| try: | |
| self.test_btn.config(text="Testing...", state="disabled") | |
| downloader = S3Downloader(config) | |
| success, message = downloader.test_connection() | |
| if success: | |
| messagebox.showinfo("Connection Test", f"✓ {message}") | |
| logger.info(f"Connection test successful: {message}") | |
| else: | |
| messagebox.showerror("Connection Test", f"✗ {message}") | |
| logger.error(f"Connection test failed: {message}") | |
| except Exception as e: | |
| error_msg = f"Connection test failed: {e}" | |
| messagebox.showerror("Connection Test", error_msg) | |
| logger.error(error_msg) | |
| finally: | |
| self.test_btn.config(text="Test Connection", state="normal") | |
| def load_config(self): | |
| config_file = "config.ini" | |
| if os.path.exists(config_file): | |
| try: | |
| parser = configparser.ConfigParser() | |
| parser.read(config_file) | |
| if "default" in parser: | |
| section = parser["default"] | |
| self.space_name_var.set(section.get("space_name", "")) | |
| self.endpoint_url_var.set(section.get("endpoint_url", "")) | |
| self.access_key_var.set(section.get("access_key", "")) | |
| self.secret_key_var.set(section.get("secret_key", "")) | |
| self.local_dir_var.set(section.get("local_directory", self.local_dir_var.get())) | |
| except Exception as e: | |
| logger.error(f"Error loading configuration: {e}") | |
| def save_config(self): | |
| config_file = "config.ini" | |
| try: | |
| parser = configparser.ConfigParser() | |
| if os.path.exists(config_file): | |
| parser.read(config_file) | |
| if "default" not in parser: | |
| parser["default"] = {} | |
| parser["default"]["space_name"] = self.space_name_var.get() | |
| parser["default"]["endpoint_url"] = self.endpoint_url_var.get() | |
| parser["default"]["access_key"] = self.access_key_var.get() | |
| parser["default"]["secret_key"] = self.secret_key_var.get() | |
| parser["default"]["local_directory"] = self.local_dir_var.get() | |
| with open(config_file, "w") as f: | |
| parser.write(f) | |
| messagebox.showinfo("Configuration Saved", "Your configuration has been saved.") | |
| except Exception as e: | |
| logger.error(f"Error saving configuration: {e}") | |
| messagebox.showerror("Error", f"Failed to save configuration: {e}") | |
| def validate_inputs(self): | |
| required = { | |
| "Space/Bucket Name": self.space_name_var.get(), | |
| "Endpoint URL": self.endpoint_url_var.get(), | |
| "Access Key": self.access_key_var.get(), | |
| "Secret Key": self.secret_key_var.get(), | |
| "Local Directory": self.local_dir_var.get() | |
| } | |
| missing = [k for k, v in required.items() if not v.strip()] | |
| if missing: | |
| messagebox.showerror("Missing Information", f"Please fill in the following fields: {', '.join(missing)}") | |
| return False | |
| return True | |
| def start_download(self): | |
| if not self.validate_inputs(): | |
| return | |
| if self.download_thread and self.download_thread.is_alive(): | |
| self.downloader.stop_download = True | |
| self.download_btn.config(text="Start Download") | |
| return | |
| config = { | |
| "space_name": self.space_name_var.get(), | |
| "endpoint_url": self.endpoint_url_var.get(), | |
| "access_key": self.access_key_var.get(), | |
| "secret_key": self.secret_key_var.get(), | |
| "local_directory": self.local_dir_var.get() | |
| } | |
| # Ensure the local directory exists | |
| Path(config["local_directory"]).mkdir(parents=True, exist_ok=True) | |
| try: | |
| self.downloader = S3Downloader(config) | |
| # Test connection first | |
| success, message = self.downloader.test_connection() | |
| if not success: | |
| messagebox.showerror("Connection Error", f"Cannot connect to S3: {message}") | |
| return | |
| # Start download in a separate thread | |
| self.download_btn.config(text="Stop Download") | |
| self.progress_label.config(text="Counting files...") | |
| self.download_thread = threading.Thread(target=self.perform_download) | |
| self.download_thread.daemon = True | |
| self.download_thread.start() | |
| except Exception as e: | |
| logger.error(f"Error starting download: {e}") | |
| messagebox.showerror("Error", f"Failed to start download: {e}") | |
| self.download_btn.config(text="Start Download") | |
| def perform_download(self): | |
| try: | |
| # Count files first | |
| self.downloader.total_files = self.downloader.count_files(self.folder_var.get()) | |
| self.downloader.downloaded_files = 0 | |
| if self.downloader.total_files == 0: | |
| self.after(0, lambda: self.progress_label.config(text="No files found to download")) | |
| self.after(0, lambda: self.download_btn.config(text="Start Download")) | |
| return | |
| self.after(0, lambda: self.progress_label.config(text=f"Found {self.downloader.total_files} files. Starting download...")) | |
| # Download files | |
| count = self.downloader.download_files( | |
| self.folder_var.get(), | |
| progress_callback=self.update_progress | |
| ) | |
| if self.downloader.stop_download: | |
| self.after(0, lambda: self.progress_label.config(text=f"Download stopped. Downloaded {count} files.")) | |
| else: | |
| self.after(0, lambda: self.progress_label.config(text=f"Download complete! Downloaded {count} files.")) | |
| self.after(0, lambda: self.download_btn.config(text="Start Download")) | |
| except Exception as e: | |
| logger.error(f"Error during download: {e}") | |
| self.after(0, lambda: messagebox.showerror("Error", f"Download failed: {e}")) | |
| self.after(0, lambda: self.download_btn.config(text="Start Download")) | |
| def update_progress(self, current, total, current_file): | |
| progress = (current / total * 100) if total > 0 else 0 | |
| self.progress_var.set(progress) | |
| self.progress_label.config(text=f"Downloading: {current} of {total} files ({progress:.1f}%)") | |
| self.current_file_label.config(text=f"Current file: {current_file}") | |
| class LogTextHandler(logging.Handler): | |
| def __init__(self, text_widget): | |
| super().__init__() | |
| self.text_widget = text_widget | |
| self.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) | |
| def emit(self, record): | |
| msg = self.format(record) | |
| def append(): | |
| self.text_widget.configure(state="normal") | |
| self.text_widget.insert("end", msg + "\n") | |
| self.text_widget.see("end") | |
| self.text_widget.configure(state="disabled") | |
| # Schedule to run in the main thread | |
| self.text_widget.after(0, append) | |
| def main(): | |
| """Main entry point""" | |
| app = S3DownloaderApp() | |
| app.mainloop() | |
| if __name__ == "__main__": | |
| main() |
Author
Update code that with "TEST CONNECTION"
#!/usr/bin/env python3
import boto3
import os
import logging
import sys
import threading
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from typing import Optional
from botocore.exceptions import ClientError, NoCredentialsError
from pathlib import Path
import configparser
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(), logging.FileHandler("s3_downloader.log")]
)
logger = logging.getLogger("s3_downloader")
class S3Downloader:
"""Class to handle downloading files from S3-compatible storage"""
def __init__(self, config: dict):
"""Initialize the downloader with configuration"""
self.config = config
self.s3_client = self._initialize_s3_client()
self.total_files = 0
self.downloaded_files = 0
self.stop_download = False
def _initialize_s3_client(self):
"""Create and return an S3 client"""
try:
# Clean up endpoint URL
endpoint_url = self.config["endpoint_url"].strip()
if not endpoint_url.startswith(('http://', 'https://')):
endpoint_url = 'https://' + endpoint_url
return boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=self.config["access_key"].strip(),
aws_secret_access_key=self.config["secret_key"].strip(),
region_name=self.config.get("region", "us-east-1")
)
except Exception as e:
logger.error(f"Failed to initialize S3 client: {e}")
raise
def test_connection(self) -> tuple[bool, str]:
"""Test the connection to the S3 service"""
try:
# Try to list buckets first
response = self.s3_client.list_buckets()
logger.info("Successfully connected to S3 service")
# Check if the specific bucket exists
bucket_name = self.config["space_name"].strip()
try:
self.s3_client.head_bucket(Bucket=bucket_name)
logger.info(f"Bucket '{bucket_name}' exists and is accessible")
return True, "Connection successful"
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return False, f"Bucket '{bucket_name}' does not exist"
elif error_code == '403':
return False, f"Access denied to bucket '{bucket_name}'"
else:
return False, f"Error accessing bucket: {e}"
except NoCredentialsError:
return False, "Invalid credentials"
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'SignatureDoesNotMatch':
return False, "Invalid access key or secret key"
elif error_code == 'InvalidAccessKeyId':
return False, "Invalid access key ID"
else:
return False, f"Connection error: {e}"
except Exception as e:
return False, f"Unexpected error: {e}"
def count_files(self, space_folder: str = '') -> int:
"""Count the number of files in the bucket/folder"""
count = 0
try:
bucket_name = self.config["space_name"].strip()
prefix = space_folder.strip()
# Remove leading slash from prefix if present
if prefix.startswith('/'):
prefix = prefix[1:]
logger.info(f"Counting files in bucket '{bucket_name}' with prefix '{prefix}'")
paginator = self.s3_client.get_paginator('list_objects_v2')
# Use empty prefix if no folder specified
if prefix:
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
else:
page_iterator = paginator.paginate(Bucket=bucket_name)
for page in page_iterator:
if 'Contents' in page:
# Count only files, not directories
count += sum(1 for obj in page['Contents'] if not obj['Key'].endswith('/'))
logger.info(f"Found {count} files")
return count
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchBucket':
logger.error(f"Bucket '{bucket_name}' does not exist")
elif error_code == 'AccessDenied':
logger.error(f"Access denied to bucket '{bucket_name}'")
else:
logger.error(f"Error counting files: {e}")
return 0
except Exception as e:
logger.error(f"Unexpected error counting files: {e}")
return 0
def download_files(self, space_folder: str = '', max_items: Optional[int] = None, progress_callback=None) -> int:
"""
Download files from the S3-compatible storage
Args:
space_folder: The folder prefix in the bucket to download from
max_items: Maximum number of items to download
progress_callback: Callback function to update progress
Returns:
Number of files downloaded
"""
downloaded_count = 0
self.stop_download = False
try:
bucket_name = self.config["space_name"].strip()
prefix = space_folder.strip()
# Remove leading slash from prefix if present
if prefix.startswith('/'):
prefix = prefix[1:]
logger.info(f"Starting download from bucket '{bucket_name}' with prefix '{prefix}'")
paginator = self.s3_client.get_paginator('list_objects_v2')
if prefix:
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
else:
page_iterator = paginator.paginate(Bucket=bucket_name)
for page in page_iterator:
if self.stop_download:
logger.info("Download stopped by user")
break
if 'Contents' not in page:
logger.info(f"No objects found with prefix: {prefix}")
continue
for obj in page['Contents']:
if self.stop_download:
break
key = obj['Key']
# If we've reached the max items, stop downloading
if max_items is not None and downloaded_count >= max_items:
return downloaded_count
if key.endswith('/'):
logger.debug(f"Skipping directory entry: {key}")
continue
local_file_path = os.path.join(self.config["local_directory"], key)
local_dir = os.path.dirname(local_file_path)
os.makedirs(local_dir, exist_ok=True)
if os.path.exists(local_file_path) and os.path.getsize(local_file_path) == obj['Size']:
logger.debug(f"File {key} already exists with correct size, skipping")
self.downloaded_files += 1
if progress_callback:
progress_callback(self.downloaded_files, self.total_files, key)
continue
logger.info(f"Downloading {key}...")
try:
self.s3_client.download_file(
bucket_name,
key,
local_file_path
)
downloaded_count += 1
self.downloaded_files += 1
if progress_callback:
progress_callback(self.downloaded_files, self.total_files, key)
logger.info(f"Downloaded {key} to {local_file_path}")
except ClientError as e:
logger.error(f"Error downloading {key}: {e}")
return downloaded_count
except Exception as e:
logger.error(f"Error during download operation: {e}")
raise
class S3DownloaderApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("S3 Downloader")
self.geometry("600x650")
self.downloader = None
self.download_thread = None
self.create_widgets()
self.load_config()
def create_widgets(self):
# Input frame
input_frame = ttk.LabelFrame(self, text="S3 Configuration")
input_frame.pack(padx=10, pady=10, fill="x")
# Space name
ttk.Label(input_frame, text="Space/Bucket Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.space_name_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.space_name_var, width=40).grid(row=0, column=1, padx=5, pady=5)
# Endpoint URL
ttk.Label(input_frame, text="Endpoint URL:").grid(row=1, column=0, sticky="w", padx=5, pady=5)
self.endpoint_url_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.endpoint_url_var, width=40).grid(row=1, column=1, padx=5, pady=5)
# Access Key
ttk.Label(input_frame, text="Access Key:").grid(row=2, column=0, sticky="w", padx=5, pady=5)
self.access_key_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.access_key_var, width=40).grid(row=2, column=1, padx=5, pady=5)
# Secret Key
ttk.Label(input_frame, text="Secret Key:").grid(row=3, column=0, sticky="w", padx=5, pady=5)
self.secret_key_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.secret_key_var, width=40, show="*").grid(row=3, column=1, padx=5, pady=5)
# Folder to download from
ttk.Label(input_frame, text="Remote Folder:").grid(row=4, column=0, sticky="w", padx=5, pady=5)
self.folder_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.folder_var, width=40).grid(row=4, column=1, padx=5, pady=5)
# Local Directory
ttk.Label(input_frame, text="Local Directory:").grid(row=5, column=0, sticky="w", padx=5, pady=5)
self.local_dir_var = tk.StringVar(value=os.path.join(os.path.expanduser("~"), "Downloads"))
local_dir_frame = ttk.Frame(input_frame)
local_dir_frame.grid(row=5, column=1, padx=5, pady=5, sticky="w")
ttk.Entry(local_dir_frame, textvariable=self.local_dir_var, width=30).pack(side="left")
ttk.Button(local_dir_frame, text="Browse", command=self.browse_directory).pack(side="left", padx=5)
# Button frame
button_frame = ttk.Frame(input_frame)
button_frame.grid(row=6, column=0, columnspan=2, pady=10)
# Test connection button
self.test_btn = ttk.Button(button_frame, text="Test Connection", command=self.test_connection)
self.test_btn.pack(side="left", padx=5)
# Save config button
ttk.Button(button_frame, text="Save Configuration", command=self.save_config).pack(side="left", padx=5)
# Download button
self.download_btn = ttk.Button(button_frame, text="Start Download", command=self.start_download)
self.download_btn.pack(side="left", padx=5)
# Progress frame
progress_frame = ttk.LabelFrame(self, text="Download Progress")
progress_frame.pack(padx=10, pady=10, fill="both", expand=True)
# Progress bar
self.progress_var = tk.DoubleVar()
self.progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100)
self.progress_bar.pack(padx=10, pady=10, fill="x")
# Progress label
self.progress_label = ttk.Label(progress_frame, text="Ready to download")
self.progress_label.pack(padx=10, pady=5)
# Current file label
self.current_file_label = ttk.Label(progress_frame, text="")
self.current_file_label.pack(padx=10, pady=5)
# Log frame
log_frame = ttk.LabelFrame(self, text="Download Log")
log_frame.pack(padx=10, pady=10, fill="both", expand=True)
# Log text
self.log_text = tk.Text(log_frame, height=8, width=70)
self.log_text.pack(padx=10, pady=10, fill="both", expand=True)
# Scrollbar for log
scrollbar = ttk.Scrollbar(self.log_text, command=self.log_text.yview)
scrollbar.pack(side="right", fill="y")
self.log_text.config(yscrollcommand=scrollbar.set)
# Add handler to capture log messages
self.log_handler = LogTextHandler(self.log_text)
logger.addHandler(self.log_handler)
def browse_directory(self):
directory = filedialog.askdirectory(initialdir=self.local_dir_var.get())
if directory:
self.local_dir_var.set(directory)
def test_connection(self):
if not self.validate_inputs():
return
config = {
"space_name": self.space_name_var.get(),
"endpoint_url": self.endpoint_url_var.get(),
"access_key": self.access_key_var.get(),
"secret_key": self.secret_key_var.get(),
"local_directory": self.local_dir_var.get()
}
try:
self.test_btn.config(text="Testing...", state="disabled")
downloader = S3Downloader(config)
success, message = downloader.test_connection()
if success:
messagebox.showinfo("Connection Test", f"✓ {message}")
logger.info(f"Connection test successful: {message}")
else:
messagebox.showerror("Connection Test", f"✗ {message}")
logger.error(f"Connection test failed: {message}")
except Exception as e:
error_msg = f"Connection test failed: {e}"
messagebox.showerror("Connection Test", error_msg)
logger.error(error_msg)
finally:
self.test_btn.config(text="Test Connection", state="normal")
def load_config(self):
config_file = "config.ini"
if os.path.exists(config_file):
try:
parser = configparser.ConfigParser()
parser.read(config_file)
if "default" in parser:
section = parser["default"]
self.space_name_var.set(section.get("space_name", ""))
self.endpoint_url_var.set(section.get("endpoint_url", ""))
self.access_key_var.set(section.get("access_key", ""))
self.secret_key_var.set(section.get("secret_key", ""))
self.local_dir_var.set(section.get("local_directory", self.local_dir_var.get()))
except Exception as e:
logger.error(f"Error loading configuration: {e}")
def save_config(self):
config_file = "config.ini"
try:
parser = configparser.ConfigParser()
if os.path.exists(config_file):
parser.read(config_file)
if "default" not in parser:
parser["default"] = {}
parser["default"]["space_name"] = self.space_name_var.get()
parser["default"]["endpoint_url"] = self.endpoint_url_var.get()
parser["default"]["access_key"] = self.access_key_var.get()
parser["default"]["secret_key"] = self.secret_key_var.get()
parser["default"]["local_directory"] = self.local_dir_var.get()
with open(config_file, "w") as f:
parser.write(f)
messagebox.showinfo("Configuration Saved", "Your configuration has been saved.")
except Exception as e:
logger.error(f"Error saving configuration: {e}")
messagebox.showerror("Error", f"Failed to save configuration: {e}")
def validate_inputs(self):
required = {
"Space/Bucket Name": self.space_name_var.get(),
"Endpoint URL": self.endpoint_url_var.get(),
"Access Key": self.access_key_var.get(),
"Secret Key": self.secret_key_var.get(),
"Local Directory": self.local_dir_var.get()
}
missing = [k for k, v in required.items() if not v.strip()]
if missing:
messagebox.showerror("Missing Information", f"Please fill in the following fields: {', '.join(missing)}")
return False
return True
def start_download(self):
if not self.validate_inputs():
return
if self.download_thread and self.download_thread.is_alive():
self.downloader.stop_download = True
self.download_btn.config(text="Start Download")
return
config = {
"space_name": self.space_name_var.get(),
"endpoint_url": self.endpoint_url_var.get(),
"access_key": self.access_key_var.get(),
"secret_key": self.secret_key_var.get(),
"local_directory": self.local_dir_var.get()
}
# Ensure the local directory exists
Path(config["local_directory"]).mkdir(parents=True, exist_ok=True)
try:
self.downloader = S3Downloader(config)
# Test connection first
success, message = self.downloader.test_connection()
if not success:
messagebox.showerror("Connection Error", f"Cannot connect to S3: {message}")
return
# Start download in a separate thread
self.download_btn.config(text="Stop Download")
self.progress_label.config(text="Counting files...")
self.download_thread = threading.Thread(target=self.perform_download)
self.download_thread.daemon = True
self.download_thread.start()
except Exception as e:
logger.error(f"Error starting download: {e}")
messagebox.showerror("Error", f"Failed to start download: {e}")
self.download_btn.config(text="Start Download")
def perform_download(self):
try:
# Count files first
self.downloader.total_files = self.downloader.count_files(self.folder_var.get())
self.downloader.downloaded_files = 0
if self.downloader.total_files == 0:
self.after(0, lambda: self.progress_label.config(text="No files found to download"))
self.after(0, lambda: self.download_btn.config(text="Start Download"))
return
self.after(0, lambda: self.progress_label.config(text=f"Found {self.downloader.total_files} files. Starting download..."))
# Download files
count = self.downloader.download_files(
self.folder_var.get(),
progress_callback=self.update_progress
)
if self.downloader.stop_download:
self.after(0, lambda: self.progress_label.config(text=f"Download stopped. Downloaded {count} files."))
else:
self.after(0, lambda: self.progress_label.config(text=f"Download complete! Downloaded {count} files."))
self.after(0, lambda: self.download_btn.config(text="Start Download"))
except Exception as e:
logger.error(f"Error during download: {e}")
self.after(0, lambda: messagebox.showerror("Error", f"Download failed: {e}"))
self.after(0, lambda: self.download_btn.config(text="Start Download"))
def update_progress(self, current, total, current_file):
progress = (current / total * 100) if total > 0 else 0
self.progress_var.set(progress)
self.progress_label.config(text=f"Downloading: {current} of {total} files ({progress:.1f}%)")
self.current_file_label.config(text=f"Current file: {current_file}")
class LogTextHandler(logging.Handler):
def __init__(self, text_widget):
super().__init__()
self.text_widget = text_widget
self.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
def emit(self, record):
msg = self.format(record)
def append():
self.text_widget.configure(state="normal")
self.text_widget.insert("end", msg + "\n")
self.text_widget.see("end")
self.text_widget.configure(state="disabled")
# Schedule to run in the main thread
self.text_widget.after(0, append)
def main():
"""Main entry point"""
app = S3DownloaderApp()
app.mainloop()
if __name__ == "__main__":
main()
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Run
Just install python and install boto3 package:
then,