Skip to content

Instantly share code, notes, and snippets.

@hellojahid3
Last active June 23, 2025 09:16
Show Gist options
  • Select an option

  • Save hellojahid3/168d8bad8deb9675cd97f9cda5d38940 to your computer and use it in GitHub Desktop.

Select an option

Save hellojahid3/168d8bad8deb9675cd97f9cda5d38940 to your computer and use it in GitHub Desktop.
Python application provides a feature-rich graphical user interface for downloading files from S3-compatible storage services (AWS S3, DigitalOcean Spaces, etc.). It uses boto3 for S3 operations and Tkinter for the GUI.
#!/usr/bin/env python3
import boto3
import os
import logging
import sys
import threading
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from typing import Optional
from botocore.exceptions import ClientError, NoCredentialsError
from pathlib import Path
import configparser
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(), logging.FileHandler("s3_downloader.log")]
)
logger = logging.getLogger("s3_downloader")
class S3Downloader:
"""Class to handle downloading files from S3-compatible storage"""
def __init__(self, config: dict):
"""Initialize the downloader with configuration"""
self.config = config
self.s3_client = self._initialize_s3_client()
self.total_files = 0
self.downloaded_files = 0
self.stop_download = False
def _initialize_s3_client(self):
"""Create and return an S3 client"""
try:
# Clean up endpoint URL
endpoint_url = self.config["endpoint_url"].strip()
if not endpoint_url.startswith(('http://', 'https://')):
endpoint_url = 'https://' + endpoint_url
return boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=self.config["access_key"].strip(),
aws_secret_access_key=self.config["secret_key"].strip(),
region_name=self.config.get("region", "us-east-1")
)
except Exception as e:
logger.error(f"Failed to initialize S3 client: {e}")
raise
def test_connection(self) -> tuple[bool, str]:
"""Test the connection to the S3 service"""
try:
# Check if the specific bucket exists and is accessible
bucket_name = self.config["space_name"].strip()
try:
self.s3_client.head_bucket(Bucket=bucket_name)
logger.info(f"Bucket '{bucket_name}' exists and is accessible")
return True, "Connection successful"
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return False, f"Bucket '{bucket_name}' does not exist"
elif error_code == '403':
return False, f"Access denied to bucket '{bucket_name}'"
else:
return False, f"Error accessing bucket: {e}"
except NoCredentialsError:
return False, "Invalid credentials"
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'SignatureDoesNotMatch':
return False, "Invalid access key or secret key"
elif error_code == 'InvalidAccessKeyId':
return False, "Invalid access key ID"
else:
return False, f"Connection error: {e}"
except Exception as e:
return False, f"Unexpected error: {e}"
def count_files(self, space_folder: str = '') -> int:
"""Count the number of files in the bucket/folder"""
count = 0
try:
bucket_name = self.config["space_name"].strip()
prefix = space_folder.strip()
# Remove leading slash from prefix if present
if prefix.startswith('/'):
prefix = prefix[1:]
logger.info(f"Counting files in bucket '{bucket_name}' with prefix '{prefix}'")
paginator = self.s3_client.get_paginator('list_objects_v2')
# Use empty prefix if no folder specified
if prefix:
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
else:
page_iterator = paginator.paginate(Bucket=bucket_name)
for page in page_iterator:
if 'Contents' in page:
# Count only files, not directories
count += sum(1 for obj in page['Contents'] if not obj['Key'].endswith('/'))
logger.info(f"Found {count} files")
return count
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchBucket':
logger.error(f"Bucket '{bucket_name}' does not exist")
elif error_code == 'AccessDenied':
logger.error(f"Access denied to bucket '{bucket_name}'")
else:
logger.error(f"Error counting files: {e}")
return 0
except Exception as e:
logger.error(f"Unexpected error counting files: {e}")
return 0
def download_files(self, space_folder: str = '', max_items: Optional[int] = None, progress_callback=None) -> int:
"""
Download files from the S3-compatible storage
Args:
space_folder: The folder prefix in the bucket to download from
max_items: Maximum number of items to download
progress_callback: Callback function to update progress
Returns:
Number of files downloaded
"""
downloaded_count = 0
self.stop_download = False
try:
bucket_name = self.config["space_name"].strip()
prefix = space_folder.strip()
# Remove leading slash from prefix if present
if prefix.startswith('/'):
prefix = prefix[1:]
logger.info(f"Starting download from bucket '{bucket_name}' with prefix '{prefix}'")
paginator = self.s3_client.get_paginator('list_objects_v2')
if prefix:
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
else:
page_iterator = paginator.paginate(Bucket=bucket_name)
for page in page_iterator:
if self.stop_download:
logger.info("Download stopped by user")
break
if 'Contents' not in page:
logger.info(f"No objects found with prefix: {prefix}")
continue
for obj in page['Contents']:
if self.stop_download:
break
key = obj['Key']
# If we've reached the max items, stop downloading
if max_items is not None and downloaded_count >= max_items:
return downloaded_count
if key.endswith('/'):
logger.debug(f"Skipping directory entry: {key}")
continue
local_file_path = os.path.join(self.config["local_directory"], key)
local_dir = os.path.dirname(local_file_path)
os.makedirs(local_dir, exist_ok=True)
if os.path.exists(local_file_path) and os.path.getsize(local_file_path) == obj['Size']:
logger.debug(f"File {key} already exists with correct size, skipping")
self.downloaded_files += 1
if progress_callback:
progress_callback(self.downloaded_files, self.total_files, key)
continue
logger.info(f"Downloading {key}...")
try:
self.s3_client.download_file(
bucket_name,
key,
local_file_path
)
downloaded_count += 1
self.downloaded_files += 1
if progress_callback:
progress_callback(self.downloaded_files, self.total_files, key)
logger.info(f"Downloaded {key} to {local_file_path}")
except ClientError as e:
logger.error(f"Error downloading {key}: {e}")
return downloaded_count
except Exception as e:
logger.error(f"Error during download operation: {e}")
raise
class S3DownloaderApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("S3 Downloader")
self.geometry("600x650")
self.downloader = None
self.download_thread = None
self.create_widgets()
self.load_config()
def create_widgets(self):
# Input frame
input_frame = ttk.LabelFrame(self, text="S3 Configuration")
input_frame.pack(padx=10, pady=10, fill="x")
# Space name
ttk.Label(input_frame, text="Space/Bucket Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.space_name_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.space_name_var, width=40).grid(row=0, column=1, padx=5, pady=5)
# Endpoint URL
ttk.Label(input_frame, text="Endpoint URL: \n(e.g. https://nyc3.digitaloceanspaces.com)").grid(row=1, column=0, sticky="w", padx=5, pady=5)
self.endpoint_url_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.endpoint_url_var, width=40).grid(row=1, column=1, padx=5, pady=5)
# Access Key
ttk.Label(input_frame, text="Access Key:").grid(row=2, column=0, sticky="w", padx=5, pady=5)
self.access_key_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.access_key_var, width=40).grid(row=2, column=1, padx=5, pady=5)
# Secret Key
ttk.Label(input_frame, text="Secret Key:").grid(row=3, column=0, sticky="w", padx=5, pady=5)
self.secret_key_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.secret_key_var, width=40, show="*").grid(row=3, column=1, padx=5, pady=5)
# Folder to download from
ttk.Label(input_frame, text="Remote Folder:\nDefault = /").grid(row=4, column=0, sticky="w", padx=5, pady=5)
self.folder_var = tk.StringVar()
ttk.Entry(input_frame, textvariable=self.folder_var, width=40).grid(row=4, column=1, padx=5, pady=5)
# Local Directory
ttk.Label(input_frame, text="Local Directory:").grid(row=5, column=0, sticky="w", padx=5, pady=5)
self.local_dir_var = tk.StringVar(value=os.path.join(os.path.expanduser("~"), "Downloads"))
local_dir_frame = ttk.Frame(input_frame)
local_dir_frame.grid(row=5, column=1, padx=5, pady=5, sticky="w")
ttk.Entry(local_dir_frame, textvariable=self.local_dir_var, width=30).pack(side="left")
ttk.Button(local_dir_frame, text="Browse", command=self.browse_directory).pack(side="left", padx=5)
# Button frame
button_frame = ttk.Frame(input_frame)
button_frame.grid(row=6, column=0, columnspan=2, pady=10)
# Test connection button
self.test_btn = ttk.Button(button_frame, text="Test Connection", command=self.test_connection)
self.test_btn.pack(side="left", padx=5)
# Save config button
ttk.Button(button_frame, text="Save Configuration", command=self.save_config).pack(side="left", padx=5)
# Download button
self.download_btn = ttk.Button(button_frame, text="Start Download", command=self.start_download)
self.download_btn.pack(side="left", padx=5)
# Progress frame
progress_frame = ttk.LabelFrame(self, text="Download Progress")
progress_frame.pack(padx=10, pady=10, fill="both", expand=True)
# Progress bar
self.progress_var = tk.DoubleVar()
self.progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100)
self.progress_bar.pack(padx=10, pady=10, fill="x")
# Progress label
self.progress_label = ttk.Label(progress_frame, text="Ready to download")
self.progress_label.pack(padx=10, pady=5)
# Current file label
self.current_file_label = ttk.Label(progress_frame, text="")
self.current_file_label.pack(padx=10, pady=5)
# Log frame
log_frame = ttk.LabelFrame(self, text="Download Log")
log_frame.pack(padx=10, pady=10, fill="both", expand=True)
# Log text
self.log_text = tk.Text(log_frame, height=8, width=70)
self.log_text.pack(padx=10, pady=10, fill="both", expand=True)
# Scrollbar for log
scrollbar = ttk.Scrollbar(self.log_text, command=self.log_text.yview)
scrollbar.pack(side="right", fill="y")
self.log_text.config(yscrollcommand=scrollbar.set)
# Add handler to capture log messages
self.log_handler = LogTextHandler(self.log_text)
logger.addHandler(self.log_handler)
def browse_directory(self):
directory = filedialog.askdirectory(initialdir=self.local_dir_var.get())
if directory:
self.local_dir_var.set(directory)
def test_connection(self):
if not self.validate_inputs():
return
config = {
"space_name": self.space_name_var.get(),
"endpoint_url": self.endpoint_url_var.get(),
"access_key": self.access_key_var.get(),
"secret_key": self.secret_key_var.get(),
"local_directory": self.local_dir_var.get()
}
try:
self.test_btn.config(text="Testing...", state="disabled")
downloader = S3Downloader(config)
success, message = downloader.test_connection()
if success:
messagebox.showinfo("Connection Test", f"✓ {message}")
logger.info(f"Connection test successful: {message}")
else:
messagebox.showerror("Connection Test", f"✗ {message}")
logger.error(f"Connection test failed: {message}")
except Exception as e:
error_msg = f"Connection test failed: {e}"
messagebox.showerror("Connection Test", error_msg)
logger.error(error_msg)
finally:
self.test_btn.config(text="Test Connection", state="normal")
def load_config(self):
config_file = "config.ini"
if os.path.exists(config_file):
try:
parser = configparser.ConfigParser()
parser.read(config_file)
if "default" in parser:
section = parser["default"]
self.space_name_var.set(section.get("space_name", ""))
self.endpoint_url_var.set(section.get("endpoint_url", ""))
self.access_key_var.set(section.get("access_key", ""))
self.secret_key_var.set(section.get("secret_key", ""))
self.local_dir_var.set(section.get("local_directory", self.local_dir_var.get()))
except Exception as e:
logger.error(f"Error loading configuration: {e}")
def save_config(self):
config_file = "config.ini"
try:
parser = configparser.ConfigParser()
if os.path.exists(config_file):
parser.read(config_file)
if "default" not in parser:
parser["default"] = {}
parser["default"]["space_name"] = self.space_name_var.get()
parser["default"]["endpoint_url"] = self.endpoint_url_var.get()
parser["default"]["access_key"] = self.access_key_var.get()
parser["default"]["secret_key"] = self.secret_key_var.get()
parser["default"]["local_directory"] = self.local_dir_var.get()
with open(config_file, "w") as f:
parser.write(f)
messagebox.showinfo("Configuration Saved", "Your configuration has been saved.")
except Exception as e:
logger.error(f"Error saving configuration: {e}")
messagebox.showerror("Error", f"Failed to save configuration: {e}")
def validate_inputs(self):
required = {
"Space/Bucket Name": self.space_name_var.get(),
"Endpoint URL": self.endpoint_url_var.get(),
"Access Key": self.access_key_var.get(),
"Secret Key": self.secret_key_var.get(),
"Local Directory": self.local_dir_var.get()
}
missing = [k for k, v in required.items() if not v.strip()]
if missing:
messagebox.showerror("Missing Information", f"Please fill in the following fields: {', '.join(missing)}")
return False
return True
def start_download(self):
if not self.validate_inputs():
return
if self.download_thread and self.download_thread.is_alive():
self.downloader.stop_download = True
self.download_btn.config(text="Start Download")
return
config = {
"space_name": self.space_name_var.get(),
"endpoint_url": self.endpoint_url_var.get(),
"access_key": self.access_key_var.get(),
"secret_key": self.secret_key_var.get(),
"local_directory": self.local_dir_var.get()
}
# Ensure the local directory exists
Path(config["local_directory"]).mkdir(parents=True, exist_ok=True)
try:
self.downloader = S3Downloader(config)
# Test connection first
success, message = self.downloader.test_connection()
if not success:
messagebox.showerror("Connection Error", f"Cannot connect to S3: {message}")
return
# Start download in a separate thread
self.download_btn.config(text="Stop Download")
self.progress_label.config(text="Counting files...")
self.download_thread = threading.Thread(target=self.perform_download)
self.download_thread.daemon = True
self.download_thread.start()
except Exception as e:
logger.error(f"Error starting download: {e}")
messagebox.showerror("Error", f"Failed to start download: {e}")
self.download_btn.config(text="Start Download")
def perform_download(self):
try:
# Count files first
self.downloader.total_files = self.downloader.count_files(self.folder_var.get())
self.downloader.downloaded_files = 0
if self.downloader.total_files == 0:
self.after(0, lambda: self.progress_label.config(text="No files found to download"))
self.after(0, lambda: self.download_btn.config(text="Start Download"))
return
self.after(0, lambda: self.progress_label.config(text=f"Found {self.downloader.total_files} files. Starting download..."))
# Download files
count = self.downloader.download_files(
self.folder_var.get(),
progress_callback=self.update_progress
)
if self.downloader.stop_download:
self.after(0, lambda: self.progress_label.config(text=f"Download stopped. Downloaded {count} files."))
else:
self.after(0, lambda: self.progress_label.config(text=f"Download complete! Downloaded {count} files."))
self.after(0, lambda: self.download_btn.config(text="Start Download"))
except Exception as e:
logger.error(f"Error during download: {e}")
self.after(0, lambda: messagebox.showerror("Error", f"Download failed: {e}"))
self.after(0, lambda: self.download_btn.config(text="Start Download"))
def update_progress(self, current, total, current_file):
progress = (current / total * 100) if total > 0 else 0
self.progress_var.set(progress)
self.progress_label.config(text=f"Downloading: {current} of {total} files ({progress:.1f}%)")
self.current_file_label.config(text=f"Current file: {current_file}")
class LogTextHandler(logging.Handler):
def __init__(self, text_widget):
super().__init__()
self.text_widget = text_widget
self.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
def emit(self, record):
msg = self.format(record)
def append():
self.text_widget.configure(state="normal")
self.text_widget.insert("end", msg + "\n")
self.text_widget.see("end")
self.text_widget.configure(state="disabled")
# Schedule to run in the main thread
self.text_widget.after(0, append)
def main():
"""Main entry point"""
app = S3DownloaderApp()
app.mainloop()
if __name__ == "__main__":
main()
@hellojahid3
Copy link
Author

Run

Just install python and install boto3 package:

pip install boto3

then,

python3 s3-bucket-downloader.py

@ImonAwesome
Copy link

Update code that with "TEST CONNECTION"

#!/usr/bin/env python3

import boto3
import os
import logging
import sys
import threading
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from typing import Optional
from botocore.exceptions import ClientError, NoCredentialsError
from pathlib import Path
import configparser

# Setup logging
logging.basicConfig(
  level=logging.INFO,
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  handlers=[logging.StreamHandler(), logging.FileHandler("s3_downloader.log")]
)
logger = logging.getLogger("s3_downloader")

class S3Downloader:
  """Class to handle downloading files from S3-compatible storage"""
  
  def __init__(self, config: dict):
    """Initialize the downloader with configuration"""
    self.config = config
    self.s3_client = self._initialize_s3_client()
    self.total_files = 0
    self.downloaded_files = 0
    self.stop_download = False
    
  def _initialize_s3_client(self):
    """Create and return an S3 client"""
    try:
      # Clean up endpoint URL
      endpoint_url = self.config["endpoint_url"].strip()
      if not endpoint_url.startswith(('http://', 'https://')):
        endpoint_url = 'https://' + endpoint_url
        
      return boto3.client(
        's3',
        endpoint_url=endpoint_url,
        aws_access_key_id=self.config["access_key"].strip(),
        aws_secret_access_key=self.config["secret_key"].strip(),
        region_name=self.config.get("region", "us-east-1")
      )
    except Exception as e:
      logger.error(f"Failed to initialize S3 client: {e}")
      raise
  
  def test_connection(self) -> tuple[bool, str]:
    """Test the connection to the S3 service"""
    try:
      # Try to list buckets first
      response = self.s3_client.list_buckets()
      logger.info("Successfully connected to S3 service")
      
      # Check if the specific bucket exists
      bucket_name = self.config["space_name"].strip()
      try:
        self.s3_client.head_bucket(Bucket=bucket_name)
        logger.info(f"Bucket '{bucket_name}' exists and is accessible")
        return True, "Connection successful"
      except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
          return False, f"Bucket '{bucket_name}' does not exist"
        elif error_code == '403':
          return False, f"Access denied to bucket '{bucket_name}'"
        else:
          return False, f"Error accessing bucket: {e}"
            
    except NoCredentialsError:
      return False, "Invalid credentials"
    except ClientError as e:
      error_code = e.response['Error']['Code']
      if error_code == 'SignatureDoesNotMatch':
        return False, "Invalid access key or secret key"
      elif error_code == 'InvalidAccessKeyId':
        return False, "Invalid access key ID"
      else:
        return False, f"Connection error: {e}"
    except Exception as e:
      return False, f"Unexpected error: {e}"
  
  def count_files(self, space_folder: str = '') -> int:
    """Count the number of files in the bucket/folder"""
    count = 0
    try:
      bucket_name = self.config["space_name"].strip()
      prefix = space_folder.strip()
      
      # Remove leading slash from prefix if present
      if prefix.startswith('/'):
        prefix = prefix[1:]
        
      logger.info(f"Counting files in bucket '{bucket_name}' with prefix '{prefix}'")
      
      paginator = self.s3_client.get_paginator('list_objects_v2')
      
      # Use empty prefix if no folder specified
      if prefix:
        page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
      else:
        page_iterator = paginator.paginate(Bucket=bucket_name)
      
      for page in page_iterator:
        if 'Contents' in page:
          # Count only files, not directories
          count += sum(1 for obj in page['Contents'] if not obj['Key'].endswith('/'))
      
      logger.info(f"Found {count} files")
      return count
      
    except ClientError as e:
      error_code = e.response['Error']['Code']
      if error_code == 'NoSuchBucket':
        logger.error(f"Bucket '{bucket_name}' does not exist")
      elif error_code == 'AccessDenied':
        logger.error(f"Access denied to bucket '{bucket_name}'")
      else:
        logger.error(f"Error counting files: {e}")
      return 0
    except Exception as e:
      logger.error(f"Unexpected error counting files: {e}")
      return 0
  
  def download_files(self, space_folder: str = '', max_items: Optional[int] = None, progress_callback=None) -> int:
    """
    Download files from the S3-compatible storage
    
    Args:
      space_folder: The folder prefix in the bucket to download from
      max_items: Maximum number of items to download
      progress_callback: Callback function to update progress
      
    Returns:
      Number of files downloaded
    """
    downloaded_count = 0
    self.stop_download = False
    
    try:
      bucket_name = self.config["space_name"].strip()
      prefix = space_folder.strip()
      
      # Remove leading slash from prefix if present
      if prefix.startswith('/'):
        prefix = prefix[1:]
        
      logger.info(f"Starting download from bucket '{bucket_name}' with prefix '{prefix}'")
      
      paginator = self.s3_client.get_paginator('list_objects_v2')
      
      if prefix:
        page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
      else:
        page_iterator = paginator.paginate(Bucket=bucket_name)
      
      for page in page_iterator:
        if self.stop_download:
          logger.info("Download stopped by user")
          break
          
        if 'Contents' not in page:
          logger.info(f"No objects found with prefix: {prefix}")
          continue
        
        for obj in page['Contents']:
          if self.stop_download:
            break
            
          key = obj['Key']
          
          # If we've reached the max items, stop downloading
          if max_items is not None and downloaded_count >= max_items:
            return downloaded_count
          
          if key.endswith('/'):
            logger.debug(f"Skipping directory entry: {key}")
            continue
            
          local_file_path = os.path.join(self.config["local_directory"], key)
          local_dir = os.path.dirname(local_file_path)
          
          os.makedirs(local_dir, exist_ok=True)
          
          if os.path.exists(local_file_path) and os.path.getsize(local_file_path) == obj['Size']:
            logger.debug(f"File {key} already exists with correct size, skipping")
            self.downloaded_files += 1
            if progress_callback:
              progress_callback(self.downloaded_files, self.total_files, key)
            continue
          
          logger.info(f"Downloading {key}...")
          try:
            self.s3_client.download_file(
              bucket_name, 
              key, 
              local_file_path
            )
            downloaded_count += 1
            self.downloaded_files += 1
            if progress_callback:
              progress_callback(self.downloaded_files, self.total_files, key)
            logger.info(f"Downloaded {key} to {local_file_path}")
          except ClientError as e:
            logger.error(f"Error downloading {key}: {e}")
            
      return downloaded_count
        
    except Exception as e:
      logger.error(f"Error during download operation: {e}")
      raise

class S3DownloaderApp(tk.Tk):
  def __init__(self):
    super().__init__()
    self.title("S3 Downloader")
    self.geometry("600x650")
    self.downloader = None
    self.download_thread = None
    
    self.create_widgets()
    self.load_config()
    
  def create_widgets(self):
    # Input frame
    input_frame = ttk.LabelFrame(self, text="S3 Configuration")
    input_frame.pack(padx=10, pady=10, fill="x")
    
    # Space name
    ttk.Label(input_frame, text="Space/Bucket Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5)
    self.space_name_var = tk.StringVar()
    ttk.Entry(input_frame, textvariable=self.space_name_var, width=40).grid(row=0, column=1, padx=5, pady=5)
    
    # Endpoint URL
    ttk.Label(input_frame, text="Endpoint URL:").grid(row=1, column=0, sticky="w", padx=5, pady=5)
    self.endpoint_url_var = tk.StringVar()
    ttk.Entry(input_frame, textvariable=self.endpoint_url_var, width=40).grid(row=1, column=1, padx=5, pady=5)
    
    # Access Key
    ttk.Label(input_frame, text="Access Key:").grid(row=2, column=0, sticky="w", padx=5, pady=5)
    self.access_key_var = tk.StringVar()
    ttk.Entry(input_frame, textvariable=self.access_key_var, width=40).grid(row=2, column=1, padx=5, pady=5)
    
    # Secret Key
    ttk.Label(input_frame, text="Secret Key:").grid(row=3, column=0, sticky="w", padx=5, pady=5)
    self.secret_key_var = tk.StringVar()
    ttk.Entry(input_frame, textvariable=self.secret_key_var, width=40, show="*").grid(row=3, column=1, padx=5, pady=5)
    
    # Folder to download from
    ttk.Label(input_frame, text="Remote Folder:").grid(row=4, column=0, sticky="w", padx=5, pady=5)
    self.folder_var = tk.StringVar()
    ttk.Entry(input_frame, textvariable=self.folder_var, width=40).grid(row=4, column=1, padx=5, pady=5)
    
    # Local Directory
    ttk.Label(input_frame, text="Local Directory:").grid(row=5, column=0, sticky="w", padx=5, pady=5)
    self.local_dir_var = tk.StringVar(value=os.path.join(os.path.expanduser("~"), "Downloads"))
    local_dir_frame = ttk.Frame(input_frame)
    local_dir_frame.grid(row=5, column=1, padx=5, pady=5, sticky="w")
    ttk.Entry(local_dir_frame, textvariable=self.local_dir_var, width=30).pack(side="left")
    ttk.Button(local_dir_frame, text="Browse", command=self.browse_directory).pack(side="left", padx=5)
    
    # Button frame
    button_frame = ttk.Frame(input_frame)
    button_frame.grid(row=6, column=0, columnspan=2, pady=10)
    
    # Test connection button
    self.test_btn = ttk.Button(button_frame, text="Test Connection", command=self.test_connection)
    self.test_btn.pack(side="left", padx=5)
    
    # Save config button
    ttk.Button(button_frame, text="Save Configuration", command=self.save_config).pack(side="left", padx=5)
    
    # Download button
    self.download_btn = ttk.Button(button_frame, text="Start Download", command=self.start_download)
    self.download_btn.pack(side="left", padx=5)
    
    # Progress frame
    progress_frame = ttk.LabelFrame(self, text="Download Progress")
    progress_frame.pack(padx=10, pady=10, fill="both", expand=True)
    
    # Progress bar
    self.progress_var = tk.DoubleVar()
    self.progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100)
    self.progress_bar.pack(padx=10, pady=10, fill="x")
    
    # Progress label
    self.progress_label = ttk.Label(progress_frame, text="Ready to download")
    self.progress_label.pack(padx=10, pady=5)
    
    # Current file label
    self.current_file_label = ttk.Label(progress_frame, text="")
    self.current_file_label.pack(padx=10, pady=5)
    
    # Log frame
    log_frame = ttk.LabelFrame(self, text="Download Log")
    log_frame.pack(padx=10, pady=10, fill="both", expand=True)
    
    # Log text
    self.log_text = tk.Text(log_frame, height=8, width=70)
    self.log_text.pack(padx=10, pady=10, fill="both", expand=True)
    
    # Scrollbar for log
    scrollbar = ttk.Scrollbar(self.log_text, command=self.log_text.yview)
    scrollbar.pack(side="right", fill="y")
    self.log_text.config(yscrollcommand=scrollbar.set)
    
    # Add handler to capture log messages
    self.log_handler = LogTextHandler(self.log_text)
    logger.addHandler(self.log_handler)
    
  def browse_directory(self):
    directory = filedialog.askdirectory(initialdir=self.local_dir_var.get())
    if directory:
      self.local_dir_var.set(directory)
      
  def test_connection(self):
    if not self.validate_inputs():
      return
      
    config = {
      "space_name": self.space_name_var.get(),
      "endpoint_url": self.endpoint_url_var.get(),
      "access_key": self.access_key_var.get(),
      "secret_key": self.secret_key_var.get(),
      "local_directory": self.local_dir_var.get()
    }
    
    try:
      self.test_btn.config(text="Testing...", state="disabled")
      downloader = S3Downloader(config)
      success, message = downloader.test_connection()
      
      if success:
        messagebox.showinfo("Connection Test", f"✓ {message}")
        logger.info(f"Connection test successful: {message}")
      else:
        messagebox.showerror("Connection Test", f"✗ {message}")
        logger.error(f"Connection test failed: {message}")
        
    except Exception as e:
      error_msg = f"Connection test failed: {e}"
      messagebox.showerror("Connection Test", error_msg)
      logger.error(error_msg)
    finally:
      self.test_btn.config(text="Test Connection", state="normal")
      
  def load_config(self):
    config_file = "config.ini"
    if os.path.exists(config_file):
      try:
        parser = configparser.ConfigParser()
        parser.read(config_file)
        if "default" in parser:
          section = parser["default"]
          self.space_name_var.set(section.get("space_name", ""))
          self.endpoint_url_var.set(section.get("endpoint_url", ""))
          self.access_key_var.set(section.get("access_key", ""))
          self.secret_key_var.set(section.get("secret_key", ""))
          self.local_dir_var.set(section.get("local_directory", self.local_dir_var.get()))
      except Exception as e:
        logger.error(f"Error loading configuration: {e}")
  
  def save_config(self):
    config_file = "config.ini"
    try:
      parser = configparser.ConfigParser()
      if os.path.exists(config_file):
        parser.read(config_file)
      
      if "default" not in parser:
        parser["default"] = {}
        
      parser["default"]["space_name"] = self.space_name_var.get()
      parser["default"]["endpoint_url"] = self.endpoint_url_var.get()
      parser["default"]["access_key"] = self.access_key_var.get()
      parser["default"]["secret_key"] = self.secret_key_var.get()
      parser["default"]["local_directory"] = self.local_dir_var.get()
      
      with open(config_file, "w") as f:
        parser.write(f)
        
      messagebox.showinfo("Configuration Saved", "Your configuration has been saved.")
    except Exception as e:
      logger.error(f"Error saving configuration: {e}")
      messagebox.showerror("Error", f"Failed to save configuration: {e}")
  
  def validate_inputs(self):
    required = {
      "Space/Bucket Name": self.space_name_var.get(),
      "Endpoint URL": self.endpoint_url_var.get(),
      "Access Key": self.access_key_var.get(),
      "Secret Key": self.secret_key_var.get(),
      "Local Directory": self.local_dir_var.get()
    }
    
    missing = [k for k, v in required.items() if not v.strip()]
    
    if missing:
      messagebox.showerror("Missing Information", f"Please fill in the following fields: {', '.join(missing)}")
      return False
      
    return True
  
  def start_download(self):
    if not self.validate_inputs():
      return
      
    if self.download_thread and self.download_thread.is_alive():
      self.downloader.stop_download = True
      self.download_btn.config(text="Start Download")
      return
      
    config = {
      "space_name": self.space_name_var.get(),
      "endpoint_url": self.endpoint_url_var.get(),
      "access_key": self.access_key_var.get(),
      "secret_key": self.secret_key_var.get(),
      "local_directory": self.local_dir_var.get()
    }
    
    # Ensure the local directory exists
    Path(config["local_directory"]).mkdir(parents=True, exist_ok=True)
    
    try:
      self.downloader = S3Downloader(config)
      
      # Test connection first
      success, message = self.downloader.test_connection()
      if not success:
        messagebox.showerror("Connection Error", f"Cannot connect to S3: {message}")
        return
      
      # Start download in a separate thread
      self.download_btn.config(text="Stop Download")
      self.progress_label.config(text="Counting files...")
      
      self.download_thread = threading.Thread(target=self.perform_download)
      self.download_thread.daemon = True
      self.download_thread.start()
    except Exception as e:
      logger.error(f"Error starting download: {e}")
      messagebox.showerror("Error", f"Failed to start download: {e}")
      self.download_btn.config(text="Start Download")
  
  def perform_download(self):
    try:
      # Count files first
      self.downloader.total_files = self.downloader.count_files(self.folder_var.get())
      self.downloader.downloaded_files = 0
      
      if self.downloader.total_files == 0:
        self.after(0, lambda: self.progress_label.config(text="No files found to download"))
        self.after(0, lambda: self.download_btn.config(text="Start Download"))
        return
        
      self.after(0, lambda: self.progress_label.config(text=f"Found {self.downloader.total_files} files. Starting download..."))
      
      # Download files
      count = self.downloader.download_files(
        self.folder_var.get(), 
        progress_callback=self.update_progress
      )
      
      if self.downloader.stop_download:
        self.after(0, lambda: self.progress_label.config(text=f"Download stopped. Downloaded {count} files."))
      else:
        self.after(0, lambda: self.progress_label.config(text=f"Download complete! Downloaded {count} files."))
      
      self.after(0, lambda: self.download_btn.config(text="Start Download"))
    except Exception as e:
      logger.error(f"Error during download: {e}")
      self.after(0, lambda: messagebox.showerror("Error", f"Download failed: {e}"))
      self.after(0, lambda: self.download_btn.config(text="Start Download"))
  
  def update_progress(self, current, total, current_file):
    progress = (current / total * 100) if total > 0 else 0
    self.progress_var.set(progress)
    self.progress_label.config(text=f"Downloading: {current} of {total} files ({progress:.1f}%)")
    self.current_file_label.config(text=f"Current file: {current_file}")

class LogTextHandler(logging.Handler):
  def __init__(self, text_widget):
    super().__init__()
    self.text_widget = text_widget
    self.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
    
  def emit(self, record):
    msg = self.format(record)
    
    def append():
      self.text_widget.configure(state="normal")
      self.text_widget.insert("end", msg + "\n")
      self.text_widget.see("end")
      self.text_widget.configure(state="disabled")
      
    # Schedule to run in the main thread
    self.text_widget.after(0, append)

def main():
  """Main entry point"""
  app = S3DownloaderApp()
  app.mainloop()

if __name__ == "__main__":
  main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment