Skip to content

Instantly share code, notes, and snippets.

@dqureshiumar
Created November 12, 2025 20:40
Show Gist options
  • Select an option

  • Save dqureshiumar/1ff71c5643ff531f5d41a6ad88f84ef2 to your computer and use it in GitHub Desktop.

Select an option

Save dqureshiumar/1ff71c5643ff531f5d41a6ad88f84ef2 to your computer and use it in GitHub Desktop.
Image Conversion from JPG/PNG to WebP with size reduction
import os
from PIL import Image
from concurrent.futures import ThreadPoolExecutor, as_completed
# ====== CONFIGURATION ======
TARGET_KB = 100 # desired max size per image (in KB)
FORMATS = (".jpg", ".jpeg", ".png") # image formats to process
CONVERT_TO = "WEBP" # output format (WEBP, JPEG, etc.)
DELETE_ORIGINAL = False # set True to remove old images
MAX_WORKERS = 8 # number of parallel threads
MAX_WIDTH = 1920 # optional resizing limit (0 = no resize)
# ============================
def compress_image(input_path, output_path, target_kb=TARGET_KB):
"""Compress a single image to target size (in KB)."""
try:
img = Image.open(input_path).convert("RGB")
# Optional resize if width is larger than MAX_WIDTH
if MAX_WIDTH and img.width > MAX_WIDTH:
ratio = MAX_WIDTH / img.width
new_height = int(img.height * ratio)
img = img.resize((MAX_WIDTH, new_height))
print(f"↘️ Resized {input_path} to {MAX_WIDTH}x{new_height}")
quality = 95
step = 5
while True:
img.save(output_path, format=CONVERT_TO, optimize=True, quality=quality)
size_kb = os.path.getsize(output_path) / 1024
if size_kb <= target_kb or quality <= 10:
break
quality -= step
print(f"✅ {input_path} → {output_path} ({size_kb:.1f} KB, q={quality})")
if DELETE_ORIGINAL and input_path != output_path:
os.remove(input_path)
print(f"🗑️ Deleted original: {input_path}")
except Exception as e:
print(f"❌ Error processing {input_path}: {e}")
def get_all_images(root_dir):
"""Recursively gather all image file paths."""
images = []
for dirpath, _, filenames in os.walk(root_dir):
for file in filenames:
if file.lower().endswith(FORMATS):
images.append(os.path.join(dirpath, file))
return images
def process_all_images(root_dir):
"""Run compression in parallel."""
all_images = get_all_images(root_dir)
print(f"📸 Found {len(all_images)} images under {root_dir}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for input_path in all_images:
base_name, _ = os.path.splitext(input_path)
output_path = f"{base_name}.{CONVERT_TO.lower()}"
futures.append(executor.submit(compress_image, input_path, output_path))
for future in as_completed(futures):
# Handle exceptions gracefully
try:
future.result()
except Exception as e:
print(f"⚠️ Thread error: {e}")
print("\n✨ Done! All images processed.")
if __name__ == "__main__":
folder = input("Enter the root folder path: ").strip('"')
process_all_images(folder)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment