Last active
November 11, 2025 12:11
-
-
Save mistivia/73c52af782c6236f311470d717b4b100 to your computer and use it in GitHub Desktop.
vibe-coded s3 upload gui
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # pip install boto3 Pillow pyperclip | |
| # --- 默认配置(可以为空,用户将在 GUI 中输入) --- | |
| BUCKET = '' | |
| REGION = '' | |
| ACCESS_KEY = '' | |
| SECRET_KEY = '' | |
| ENDPOINT = '' | |
| PREFIX = 'myuploads/' | |
| VISIT_ENDPOINT = '' | |
| # --------------------------------------------- | |
| import tkinter as tk | |
| from tkinter import filedialog, ttk | |
| import boto3 | |
| from botocore.exceptions import NoCredentialsError, ClientError | |
| import os | |
| from PIL import Image, ImageGrab | |
| import time | |
| import uuid | |
| import pyperclip | |
| # --- S3 上传核心逻辑 --- | |
| def upload_file_to_s3(s3_client, file_path, bucket_name, s3_object_name, extra_args = {}): | |
| """ | |
| 将文件上传到 S3 兼容的对象存储。 | |
| """ | |
| try: | |
| s3_client.upload_file(file_path, bucket_name, s3_object_name, ExtraArgs=extra_args) | |
| # 返回一个简单的成功消息,URL 的构建将在 GUI 层面完成 | |
| return True, "上传成功!" | |
| except NoCredentialsError: | |
| return False, "错误:未找到或凭证无效。请检查 Access Key 和 Secret Key。" | |
| except ClientError as e: | |
| error_code = e.response.get("Error", {}).get("Code") | |
| error_msg = e.response.get("Error", {}).get("Message") | |
| if error_code == 'NoSuchBucket': | |
| return False, f"错误:存储桶 '{bucket_name}' 不存在或无权访问。" | |
| return False, f"S3 客户端错误 ({error_code}): {error_msg}" | |
| except Exception as e: | |
| return False, f"发生未知错误: {e}" | |
| # --- GUI 逻辑 --- | |
| class S3UploaderApp: | |
| def __init__(self, master): | |
| self.master = master | |
| master.title("S3 上传工具") | |
| master.geometry("550x450") | |
| self.visit_endpoint_var = tk.StringVar(value=VISIT_ENDPOINT) | |
| self.endpoint_var = tk.StringVar(value=ENDPOINT) | |
| self.access_key_var = tk.StringVar(value=ACCESS_KEY) | |
| self.secret_key_var = tk.StringVar(value=SECRET_KEY) | |
| self.region_var = tk.StringVar(value=REGION) | |
| self.bucket_var = tk.StringVar(value=BUCKET) | |
| self.prefix_var = tk.StringVar(value=PREFIX) | |
| self.setup_ui() | |
| def setup_ui(self): | |
| main_frame = ttk.Frame(self.master, padding="10") | |
| main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) | |
| self.master.columnconfigure(0, weight=1) | |
| self.master.rowconfigure(0, weight=1) | |
| config_frame = ttk.LabelFrame(main_frame, text="S3 配置", padding="10") | |
| config_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10)) | |
| config_frame.columnconfigure(1, weight=1) | |
| ttk.Label(config_frame, text="Endpoint URL:").grid(row=0, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.endpoint_var).grid(row=0, column=1, sticky="ew", padx=5, pady=2) | |
| ttk.Label(config_frame, text="Access Key ID:").grid(row=1, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.access_key_var, show="*").grid(row=1, column=1, sticky="ew", padx=5, pady=2) | |
| ttk.Label(config_frame, text="Secret Access Key:").grid(row=2, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.secret_key_var, show="*").grid(row=2, column=1, sticky="ew", padx=5, pady=2) | |
| ttk.Label(config_frame, text="Region:").grid(row=3, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.region_var).grid(row=3, column=1, sticky="ew", padx=5, pady=2) | |
| ttk.Label(config_frame, text="Bucket Name:").grid(row=4, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.bucket_var).grid(row=4, column=1, sticky="ew", padx=5, pady=2) | |
| ttk.Label(config_frame, text="文件前缀 (路径):").grid(row=5, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.prefix_var).grid(row=5, column=1, sticky="ew", padx=5, pady=2) | |
| ttk.Label(config_frame, text="Visit Endpoint URL:").grid(row=6, column=0, sticky="w", padx=5, pady=2) | |
| ttk.Entry(config_frame, textvariable=self.visit_endpoint_var).grid(row=6, column=1, sticky="ew", padx=5, pady=2) | |
| action_frame = ttk.LabelFrame(main_frame, text="操作", padding="10") | |
| action_frame.grid(row=1, column=0, sticky="ew", pady=10) | |
| action_frame.columnconfigure(0, weight=1) | |
| action_frame.columnconfigure(1, weight=1) | |
| action_frame.columnconfigure(2, weight=1) | |
| self.img_button = ttk.Button(action_frame, text="上传剪贴板图片", command=self.upload_clipboard_image) | |
| self.img_button.grid(row=0, column=0, padx=5, pady=5, sticky="ew") | |
| self.img_button = ttk.Button(action_frame, text="上传剪贴板文本", command=self.upload_clipboard_text) | |
| self.img_button.grid(row=0, column=1, padx=5, pady=5, sticky="ew") | |
| self.file_button = ttk.Button(action_frame, text="上传本地文件", command=self.upload_local_file) | |
| self.file_button.grid(row=0, column=2, padx=5, pady=5, sticky="ew") | |
| self.status_label = ttk.Label(main_frame, text="请先完成 S3 配置,然后选择操作...", wraplength=500, justify=tk.LEFT) | |
| self.status_label.grid(row=2, column=0, sticky="w", pady=10) | |
| main_frame.columnconfigure(0, weight=1) | |
| def _create_s3_client(self): | |
| region = self.region_var.get() or None | |
| access_key = self.access_key_var.get() or None | |
| secret_key = self.secret_key_var.get() or None | |
| endpoint = self.endpoint_var.get() or None | |
| try: | |
| return boto3.client( | |
| 's3', | |
| region_name=region, | |
| aws_access_key_id=access_key, | |
| aws_secret_access_key=secret_key, | |
| endpoint_url=endpoint | |
| ) | |
| except Exception as e: | |
| self.update_status(f"创建 S3 客户端失败: {e}", is_success=False) | |
| return None | |
| # --- [ 新增 ] --- | |
| def _generate_http_url(self, bucket, object_name): | |
| """根据配置生成可公开访问的 HTTP(S) URL""" | |
| visit_endpoint = self.visit_endpoint_var.get() | |
| return f"{visit_endpoint}/{object_name}" | |
| # --- [ 修改 ] --- | |
| def update_status(self, message, is_success=True, url=None): | |
| """更新状态栏颜色和文本,并处理 URL 复制""" | |
| color = "darkgreen" if is_success else "red" | |
| display_message = message | |
| if is_success and url: | |
| self.master.clipboard_clear() | |
| self.master.clipboard_append(url) | |
| display_message = f"{message}\nURL: {url}\n(链接已复制到剪贴板)" | |
| self.status_label.config(text=display_message, foreground=color) | |
| def generate_s3_name_for_image(self, extension): | |
| timestamp = int(time.time()) | |
| prefix = self.prefix_var.get() | |
| if prefix and not prefix.endswith('/'): | |
| prefix += '/' | |
| return f"{prefix}{uuid.uuid4().hex[:8]}image_{timestamp}{extension}" | |
| def generate_s3_name_for_text(self, extension): | |
| timestamp = int(time.time()) | |
| prefix = self.prefix_var.get() | |
| if prefix and not prefix.endswith('/'): | |
| prefix += '/' | |
| return f"{prefix}{uuid.uuid4().hex[:8]}text_{timestamp}{extension}" | |
| # --- [ 修改 ] --- | |
| def upload_local_file(self): | |
| bucket = self.bucket_var.get() | |
| if not bucket: | |
| self.update_status("错误:请输入存储桶名称 (Bucket Name)。", False) | |
| return | |
| file_path = filedialog.askopenfilename() | |
| if not file_path: | |
| return | |
| s3_client = self._create_s3_client() | |
| if not s3_client: | |
| return | |
| filename = os.path.basename(file_path) | |
| random_prefix = uuid.uuid4().hex[:8] | |
| # 将随机前缀和原始文件名组合 | |
| randomized_filename = f"{random_prefix}-{filename}" | |
| prefix = self.prefix_var.get() | |
| if prefix and not prefix.endswith('/'): | |
| prefix += '/' | |
| s3_object_name = f"{prefix}{randomized_filename}" | |
| self.update_status(f"正在上传文件 ({filename})...", True) | |
| self.master.update_idletasks() | |
| success, message = upload_file_to_s3(s3_client, file_path, bucket, s3_object_name) | |
| http_url = None | |
| if success: | |
| http_url = self._generate_http_url(bucket, s3_object_name) | |
| self.update_status(message, success, url=http_url) | |
| # --- [ 修改 ] --- | |
| def upload_clipboard_image(self): | |
| bucket = self.bucket_var.get() | |
| if not bucket: | |
| self.update_status("错误:请输入存储桶名称 (Bucket Name)。", False) | |
| return | |
| s3_client = self._create_s3_client() | |
| if not s3_client: | |
| return | |
| try: | |
| img = ImageGrab.grabclipboard() | |
| except Exception as e: | |
| self.update_status(f"无法访问剪贴板: {e},请检查权限。", False) | |
| return | |
| if not isinstance(img, Image.Image): | |
| self.update_status("错误:剪贴板中没有有效的图像数据。", False) | |
| return | |
| s3_object_name = self.generate_s3_name_for_image(".jpg") | |
| temp_file = "temp_clipboard_image.jpg" | |
| try: | |
| if img.mode == 'RGBA': | |
| img = img.convert('RGB') | |
| img.save(temp_file, "JPEG") | |
| self.update_status(f"正在上传图片 ({s3_object_name})...", True) | |
| self.master.update_idletasks() | |
| success, message = upload_file_to_s3(s3_client, temp_file, bucket, s3_object_name) | |
| http_url = None | |
| if success: | |
| http_url = self._generate_http_url(bucket, s3_object_name) | |
| self.update_status(message, success, url=http_url) | |
| finally: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| def upload_clipboard_text(self): | |
| bucket = self.bucket_var.get() | |
| if not bucket: | |
| self.update_status("错误:请输入存储桶名称 (Bucket Name)。", False) | |
| return | |
| s3_client = self._create_s3_client() | |
| if not s3_client: | |
| return | |
| # 1. 获取剪贴板文本 | |
| try: | |
| text_content = pyperclip.paste() | |
| except Exception as e: | |
| self.update_status(f"无法访问剪贴板: {e},请检查权限。", False) | |
| return | |
| # 2. 检查内容类型 | |
| if not text_content or not isinstance(text_content, str): | |
| self.update_status("错误:剪贴板中没有有效的文本数据。", False) | |
| return | |
| text_content = text_content.replace('\r\n', '\n') | |
| # 3. 生成 S3 文件名 (.txt 扩展名) | |
| # 假设我们有一个通用的生成文件名的方法 | |
| s3_object_name = self.generate_s3_name_for_text(".txt") | |
| temp_file = "temp_clipboard_text.txt" | |
| try: | |
| # 4. 保存临时文件(使用 UTF-8 编码支持中文) | |
| try: | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| f.write(text_content) | |
| except Exception as e: | |
| self.update_status(f"无法保存临时文本文件: {e}", False) | |
| return | |
| # 5. 上传 | |
| self.update_status(f"正在上传文本 ({s3_object_name})...", True) | |
| # 确保 UI 更新 | |
| if hasattr(self, 'master') and self.master: | |
| self.master.update_idletasks() | |
| extra_args = { | |
| 'ContentType': 'text/plain; charset=utf-8' | |
| } | |
| success, message = upload_file_to_s3(s3_client, temp_file, bucket, s3_object_name, extra_args) | |
| http_url = None | |
| if success: | |
| http_url = self._generate_http_url(bucket, s3_object_name) | |
| self.update_status(message, success, url=http_url) | |
| finally: | |
| # 清理临时文件 | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| if __name__ == "__main__": | |
| root = tk.Tk() | |
| app = S3UploaderApp(root) | |
| root.mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment