Skip to content

Instantly share code, notes, and snippets.

@mistivia
Last active November 11, 2025 12:11
Show Gist options
  • Select an option

  • Save mistivia/73c52af782c6236f311470d717b4b100 to your computer and use it in GitHub Desktop.

Select an option

Save mistivia/73c52af782c6236f311470d717b4b100 to your computer and use it in GitHub Desktop.
vibe-coded s3 upload gui
# pip install boto3 Pillow pyperclip
# --- 默认配置(可以为空,用户将在 GUI 中输入) ---
BUCKET = ''
REGION = ''
ACCESS_KEY = ''
SECRET_KEY = ''
ENDPOINT = ''
PREFIX = 'myuploads/'
VISIT_ENDPOINT = ''
# ---------------------------------------------
import tkinter as tk
from tkinter import filedialog, ttk
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
import os
from PIL import Image, ImageGrab
import time
import uuid
import pyperclip
# --- S3 上传核心逻辑 ---
def upload_file_to_s3(s3_client, file_path, bucket_name, s3_object_name, extra_args = {}):
"""
将文件上传到 S3 兼容的对象存储。
"""
try:
s3_client.upload_file(file_path, bucket_name, s3_object_name, ExtraArgs=extra_args)
# 返回一个简单的成功消息,URL 的构建将在 GUI 层面完成
return True, "上传成功!"
except NoCredentialsError:
return False, "错误:未找到或凭证无效。请检查 Access Key 和 Secret Key。"
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
error_msg = e.response.get("Error", {}).get("Message")
if error_code == 'NoSuchBucket':
return False, f"错误:存储桶 '{bucket_name}' 不存在或无权访问。"
return False, f"S3 客户端错误 ({error_code}): {error_msg}"
except Exception as e:
return False, f"发生未知错误: {e}"
# --- GUI 逻辑 ---
class S3UploaderApp:
def __init__(self, master):
self.master = master
master.title("S3 上传工具")
master.geometry("550x450")
self.visit_endpoint_var = tk.StringVar(value=VISIT_ENDPOINT)
self.endpoint_var = tk.StringVar(value=ENDPOINT)
self.access_key_var = tk.StringVar(value=ACCESS_KEY)
self.secret_key_var = tk.StringVar(value=SECRET_KEY)
self.region_var = tk.StringVar(value=REGION)
self.bucket_var = tk.StringVar(value=BUCKET)
self.prefix_var = tk.StringVar(value=PREFIX)
self.setup_ui()
def setup_ui(self):
main_frame = ttk.Frame(self.master, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.master.columnconfigure(0, weight=1)
self.master.rowconfigure(0, weight=1)
config_frame = ttk.LabelFrame(main_frame, text="S3 配置", padding="10")
config_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
config_frame.columnconfigure(1, weight=1)
ttk.Label(config_frame, text="Endpoint URL:").grid(row=0, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.endpoint_var).grid(row=0, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(config_frame, text="Access Key ID:").grid(row=1, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.access_key_var, show="*").grid(row=1, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(config_frame, text="Secret Access Key:").grid(row=2, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.secret_key_var, show="*").grid(row=2, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(config_frame, text="Region:").grid(row=3, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.region_var).grid(row=3, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(config_frame, text="Bucket Name:").grid(row=4, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.bucket_var).grid(row=4, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(config_frame, text="文件前缀 (路径):").grid(row=5, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.prefix_var).grid(row=5, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(config_frame, text="Visit Endpoint URL:").grid(row=6, column=0, sticky="w", padx=5, pady=2)
ttk.Entry(config_frame, textvariable=self.visit_endpoint_var).grid(row=6, column=1, sticky="ew", padx=5, pady=2)
action_frame = ttk.LabelFrame(main_frame, text="操作", padding="10")
action_frame.grid(row=1, column=0, sticky="ew", pady=10)
action_frame.columnconfigure(0, weight=1)
action_frame.columnconfigure(1, weight=1)
action_frame.columnconfigure(2, weight=1)
self.img_button = ttk.Button(action_frame, text="上传剪贴板图片", command=self.upload_clipboard_image)
self.img_button.grid(row=0, column=0, padx=5, pady=5, sticky="ew")
self.img_button = ttk.Button(action_frame, text="上传剪贴板文本", command=self.upload_clipboard_text)
self.img_button.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.file_button = ttk.Button(action_frame, text="上传本地文件", command=self.upload_local_file)
self.file_button.grid(row=0, column=2, padx=5, pady=5, sticky="ew")
self.status_label = ttk.Label(main_frame, text="请先完成 S3 配置,然后选择操作...", wraplength=500, justify=tk.LEFT)
self.status_label.grid(row=2, column=0, sticky="w", pady=10)
main_frame.columnconfigure(0, weight=1)
def _create_s3_client(self):
region = self.region_var.get() or None
access_key = self.access_key_var.get() or None
secret_key = self.secret_key_var.get() or None
endpoint = self.endpoint_var.get() or None
try:
return boto3.client(
's3',
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=endpoint
)
except Exception as e:
self.update_status(f"创建 S3 客户端失败: {e}", is_success=False)
return None
# --- [ 新增 ] ---
def _generate_http_url(self, bucket, object_name):
"""根据配置生成可公开访问的 HTTP(S) URL"""
visit_endpoint = self.visit_endpoint_var.get()
return f"{visit_endpoint}/{object_name}"
# --- [ 修改 ] ---
def update_status(self, message, is_success=True, url=None):
"""更新状态栏颜色和文本,并处理 URL 复制"""
color = "darkgreen" if is_success else "red"
display_message = message
if is_success and url:
self.master.clipboard_clear()
self.master.clipboard_append(url)
display_message = f"{message}\nURL: {url}\n(链接已复制到剪贴板)"
self.status_label.config(text=display_message, foreground=color)
def generate_s3_name_for_image(self, extension):
timestamp = int(time.time())
prefix = self.prefix_var.get()
if prefix and not prefix.endswith('/'):
prefix += '/'
return f"{prefix}{uuid.uuid4().hex[:8]}image_{timestamp}{extension}"
def generate_s3_name_for_text(self, extension):
timestamp = int(time.time())
prefix = self.prefix_var.get()
if prefix and not prefix.endswith('/'):
prefix += '/'
return f"{prefix}{uuid.uuid4().hex[:8]}text_{timestamp}{extension}"
# --- [ 修改 ] ---
def upload_local_file(self):
bucket = self.bucket_var.get()
if not bucket:
self.update_status("错误:请输入存储桶名称 (Bucket Name)。", False)
return
file_path = filedialog.askopenfilename()
if not file_path:
return
s3_client = self._create_s3_client()
if not s3_client:
return
filename = os.path.basename(file_path)
random_prefix = uuid.uuid4().hex[:8]
# 将随机前缀和原始文件名组合
randomized_filename = f"{random_prefix}-{filename}"
prefix = self.prefix_var.get()
if prefix and not prefix.endswith('/'):
prefix += '/'
s3_object_name = f"{prefix}{randomized_filename}"
self.update_status(f"正在上传文件 ({filename})...", True)
self.master.update_idletasks()
success, message = upload_file_to_s3(s3_client, file_path, bucket, s3_object_name)
http_url = None
if success:
http_url = self._generate_http_url(bucket, s3_object_name)
self.update_status(message, success, url=http_url)
# --- [ 修改 ] ---
def upload_clipboard_image(self):
bucket = self.bucket_var.get()
if not bucket:
self.update_status("错误:请输入存储桶名称 (Bucket Name)。", False)
return
s3_client = self._create_s3_client()
if not s3_client:
return
try:
img = ImageGrab.grabclipboard()
except Exception as e:
self.update_status(f"无法访问剪贴板: {e},请检查权限。", False)
return
if not isinstance(img, Image.Image):
self.update_status("错误:剪贴板中没有有效的图像数据。", False)
return
s3_object_name = self.generate_s3_name_for_image(".jpg")
temp_file = "temp_clipboard_image.jpg"
try:
if img.mode == 'RGBA':
img = img.convert('RGB')
img.save(temp_file, "JPEG")
self.update_status(f"正在上传图片 ({s3_object_name})...", True)
self.master.update_idletasks()
success, message = upload_file_to_s3(s3_client, temp_file, bucket, s3_object_name)
http_url = None
if success:
http_url = self._generate_http_url(bucket, s3_object_name)
self.update_status(message, success, url=http_url)
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
def upload_clipboard_text(self):
bucket = self.bucket_var.get()
if not bucket:
self.update_status("错误:请输入存储桶名称 (Bucket Name)。", False)
return
s3_client = self._create_s3_client()
if not s3_client:
return
# 1. 获取剪贴板文本
try:
text_content = pyperclip.paste()
except Exception as e:
self.update_status(f"无法访问剪贴板: {e},请检查权限。", False)
return
# 2. 检查内容类型
if not text_content or not isinstance(text_content, str):
self.update_status("错误:剪贴板中没有有效的文本数据。", False)
return
text_content = text_content.replace('\r\n', '\n')
# 3. 生成 S3 文件名 (.txt 扩展名)
# 假设我们有一个通用的生成文件名的方法
s3_object_name = self.generate_s3_name_for_text(".txt")
temp_file = "temp_clipboard_text.txt"
try:
# 4. 保存临时文件(使用 UTF-8 编码支持中文)
try:
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(text_content)
except Exception as e:
self.update_status(f"无法保存临时文本文件: {e}", False)
return
# 5. 上传
self.update_status(f"正在上传文本 ({s3_object_name})...", True)
# 确保 UI 更新
if hasattr(self, 'master') and self.master:
self.master.update_idletasks()
extra_args = {
'ContentType': 'text/plain; charset=utf-8'
}
success, message = upload_file_to_s3(s3_client, temp_file, bucket, s3_object_name, extra_args)
http_url = None
if success:
http_url = self._generate_http_url(bucket, s3_object_name)
self.update_status(message, success, url=http_url)
finally:
# 清理临时文件
if os.path.exists(temp_file):
os.remove(temp_file)
if __name__ == "__main__":
root = tk.Tk()
app = S3UploaderApp(root)
root.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment