Created
May 4, 2025 23:53
-
-
Save ph33nx/a9032c7466677ba603110c09aca3cc8b to your computer and use it in GitHub Desktop.
Lightning‑fast Python CLI Script to bulk convert any image format (PNG, JPEG, WebP, BMP, TIFF, GIF…) into compressed, progressive JPEGs, bulk renamed, with all metadata stripped and every CPU core engaged. Ideal for web performance, SEO, e‑commerce product galleries, and batch uploads.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
bulk_img_convert_and_rename.py – Bulk Image Convert & Rename Python Script | |
Production‑ready command‑line utility for web developers, photographers, SEO specialists and e‑commerce teams who need to bulk convert and bulk rename thousands of images before pushing to GitHub, S3, CDN or serving them on high‑performance websites. | |
Core features: | |
- Bulk image convert: reads every format Pillow supports (PNG, JPEG, WebP, BMP, TIFF, GIF, etc.) and compresses them into small, progressive JPEGs optimised for web delivery. | |
- Automatic bulk rename: each output is saved as img_<n>.jpg, ready for CDN caching and static‑site generators. | |
- One‑click metadata stripping: removes EXIF, GPS, ICC and all other heavy data to protect privacy and improve SEO scores. | |
- Multi‑core performance: saturates every CPU core for fast throughput. | |
Default workflow: | |
1. Recursively scan the supplied directory for images. | |
2. Prompt once: Delete original files after conversion? [Y/n] (press Enter or y to delete, n to keep). | |
3. Convert, optimise, rename: ready for immediate deployment. | |
Command‑line synopsis: | |
python bulk_img_convert_and_rename.py <directory> [options] | |
Optional arguments: | |
-q, --quality JPEG quality 1‑95 (default 85, balanced quality and size) | |
--shuffle Randomise processing order to reduce I/O bursts on SSDs | |
-w, --workers Number of parallel processes (default: all CPU cores) | |
Author: ph33nx https://github.com/ph33nx | |
License: MIT (commercial and non‑commercial use permitted) | |
Version: 1.0.0 | |
""" | |
from __future__ import annotations | |
import argparse | |
import collections | |
import os | |
import random | |
import sys | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from pathlib import Path | |
from typing import Iterable, List | |
# --------------------------------------------------------------------------- | |
# Lazy Pillow import (auto‑installs if missing) | |
# --------------------------------------------------------------------------- | |
try: | |
from PIL import Image # type: ignore | |
except ImportError: # pragma: no cover | |
import subprocess | |
print("Pillow not found; installing…", file=sys.stderr) | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "pillow"]) | |
from PIL import Image # type: ignore | |
# --------------------------------------------------------------------------- | |
# Helpers | |
# --------------------------------------------------------------------------- | |
def collect_image_files(root: Path) -> List[Path]: | |
"""Recursively gather all image files Pillow can open under *root*.""" | |
matches: List[Path] = [] | |
for path in root.rglob("*"): | |
if not path.is_file(): | |
continue | |
try: | |
with Image.open(path): | |
matches.append(path) | |
except OSError: | |
pass # Not an image Pillow can read. | |
return matches | |
def plan_destinations(sources: Iterable[Path]) -> dict[Path, Path]: | |
"""Pre‑compute destination filenames to avoid collisions in parallel.""" | |
counters: dict[Path, int] = collections.defaultdict(int) | |
mapping: dict[Path, Path] = {} | |
for src in sources: | |
directory = src.parent | |
counters[directory] += 1 | |
while True: | |
candidate = directory / f"img_{counters[directory]}.jpg" | |
if candidate.exists() or candidate in mapping.values(): | |
counters[directory] += 1 | |
continue | |
mapping[src] = candidate | |
break | |
return mapping | |
# --------------------------------------------------------------------------- | |
# Conversion worker (executes in a separate process) | |
# --------------------------------------------------------------------------- | |
def _convert_one(src: str, dest: str, quality: int, keep_original: bool) -> str: | |
"""Convert *src* to *dest* (string paths). Return *dest* or error sentinel.""" | |
from PIL import Image # local import inside subprocess | |
try: | |
with Image.open(src) as im: | |
if im.mode not in ("RGB", "L"): | |
im = im.convert("RGB") | |
im.info.clear() # strip metadata explicitly | |
im.save(dest, "JPEG", quality=quality, optimize=True, progressive=True) | |
if not keep_original: | |
os.remove(src) | |
return dest | |
except Exception as exc: # pragma: no cover | |
return f"ERROR::{src}::{exc}" | |
# --------------------------------------------------------------------------- | |
# CLI & main logic | |
# --------------------------------------------------------------------------- | |
def parse_args(argv: List[str]) -> argparse.Namespace: | |
p = argparse.ArgumentParser(description="Bulk image convert & rename – compress and optimise for web.") | |
p.add_argument("folder", type=Path, help="Root directory to scan.") | |
p.add_argument("-q", "--quality", type=int, default=85, metavar="1‑95", help="JPEG quality (default 85).") | |
p.add_argument("--shuffle", action="store_true", help="Shuffle processing order (disabled by default).") | |
p.add_argument("-w", "--workers", type=int, default=os.cpu_count() or 1, help="Worker processes (default: all cores).") | |
return p.parse_args(argv) | |
def ask_delete_prompt() -> bool: | |
"""Return True if the user wishes to delete the originals (default Yes).""" | |
try: | |
reply = input("Delete original files after conversion? [Y/n] ").strip().lower() | |
except EOFError: # non‑interactive shells / redirected input | |
reply = "y" | |
return reply not in {"n", "no"} | |
def main(argv: List[str] | None = None) -> None: | |
args = parse_args(argv or sys.argv[1:]) | |
root = args.folder.expanduser().resolve() | |
if not root.is_dir(): | |
sys.exit(f"Error: '{root}' is not a directory.") | |
sources = collect_image_files(root) | |
if not sources: | |
print("No images found – nothing to do.") | |
return | |
delete_originals = ask_delete_prompt() | |
if args.shuffle: | |
random.shuffle(sources) | |
mapping = plan_destinations(sources) | |
print(f"Processing {len(mapping)} file(s) using {args.workers} worker(s)…") | |
converted = failed = 0 | |
keep_original = not delete_originals | |
with ProcessPoolExecutor(max_workers=args.workers) as pool: | |
futures = { | |
pool.submit(_convert_one, str(s), str(d), args.quality, keep_original): s | |
for s, d in mapping.items() | |
} | |
for fut in as_completed(futures): | |
result = fut.result() | |
if result.startswith("ERROR::"): | |
failed += 1 | |
src_err, _msg = result.split("::", 2)[:2] | |
print(f"✘ Failed: {src_err}") | |
else: | |
converted += 1 | |
print(f"✔ {result}") | |
print(f"Done. Converted: {converted}. Failed: {failed}.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment