Created
November 12, 2025 20:40
-
-
Save dqureshiumar/1ff71c5643ff531f5d41a6ad88f84ef2 to your computer and use it in GitHub Desktop.
Image Conversion from JPG/PNG to WebP with size reduction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from PIL import Image | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| # ====== CONFIGURATION ====== | |
| TARGET_KB = 100 # desired max size per image (in KB) | |
| FORMATS = (".jpg", ".jpeg", ".png") # image formats to process | |
| CONVERT_TO = "WEBP" # output format (WEBP, JPEG, etc.) | |
| DELETE_ORIGINAL = False # set True to remove old images | |
| MAX_WORKERS = 8 # number of parallel threads | |
| MAX_WIDTH = 1920 # optional resizing limit (0 = no resize) | |
| # ============================ | |
| def compress_image(input_path, output_path, target_kb=TARGET_KB): | |
| """Compress a single image to target size (in KB).""" | |
| try: | |
| img = Image.open(input_path).convert("RGB") | |
| # Optional resize if width is larger than MAX_WIDTH | |
| if MAX_WIDTH and img.width > MAX_WIDTH: | |
| ratio = MAX_WIDTH / img.width | |
| new_height = int(img.height * ratio) | |
| img = img.resize((MAX_WIDTH, new_height)) | |
| print(f"↘️ Resized {input_path} to {MAX_WIDTH}x{new_height}") | |
| quality = 95 | |
| step = 5 | |
| while True: | |
| img.save(output_path, format=CONVERT_TO, optimize=True, quality=quality) | |
| size_kb = os.path.getsize(output_path) / 1024 | |
| if size_kb <= target_kb or quality <= 10: | |
| break | |
| quality -= step | |
| print(f"✅ {input_path} → {output_path} ({size_kb:.1f} KB, q={quality})") | |
| if DELETE_ORIGINAL and input_path != output_path: | |
| os.remove(input_path) | |
| print(f"🗑️ Deleted original: {input_path}") | |
| except Exception as e: | |
| print(f"❌ Error processing {input_path}: {e}") | |
| def get_all_images(root_dir): | |
| """Recursively gather all image file paths.""" | |
| images = [] | |
| for dirpath, _, filenames in os.walk(root_dir): | |
| for file in filenames: | |
| if file.lower().endswith(FORMATS): | |
| images.append(os.path.join(dirpath, file)) | |
| return images | |
| def process_all_images(root_dir): | |
| """Run compression in parallel.""" | |
| all_images = get_all_images(root_dir) | |
| print(f"📸 Found {len(all_images)} images under {root_dir}") | |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| futures = [] | |
| for input_path in all_images: | |
| base_name, _ = os.path.splitext(input_path) | |
| output_path = f"{base_name}.{CONVERT_TO.lower()}" | |
| futures.append(executor.submit(compress_image, input_path, output_path)) | |
| for future in as_completed(futures): | |
| # Handle exceptions gracefully | |
| try: | |
| future.result() | |
| except Exception as e: | |
| print(f"⚠️ Thread error: {e}") | |
| print("\n✨ Done! All images processed.") | |
| if __name__ == "__main__": | |
| folder = input("Enter the root folder path: ").strip('"') | |
| process_all_images(folder) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment