Skip to content

Instantly share code, notes, and snippets.

@astoeckel
Last active April 27, 2019 17:08
Show Gist options
  • Save astoeckel/d655bb4c0d922356b03f86a5e5a6a133 to your computer and use it in GitHub Desktop.
Save astoeckel/d655bb4c0d922356b03f86a5e5a6a133 to your computer and use it in GitHub Desktop.
Mirrors a source directory containing images to a target directory and shrinks the images in the process
#!/usr/bin/env python3
################################################################################
# imgshrink.py #
# Mirrors a directory containing photos in smaller resolution #
# (c) 2019 Andreas Stöckel, licensed under the GPLv3 #
################################################################################
import math
import multiprocessing
import os
import random
import sys
import subprocess
import tempfile
import base64
# Initialize the logger
import logging
logging.basicConfig(format='[%(levelname)s] %(asctime)s: %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
################################################################################
# DEFAULT PARAMETERS #
################################################################################
# Supported file extensions
FILE_EXTS = [".png", ".jpg"]
JPEG_QUALITY = 85 # JPEG Quality
JPEG_SUBSAMPLING = "4:4:4" # JPEG Subsampling
RES_W, RES_H = 1920, 1080 # Target resolution (Full HD)
#RES_W, RES_H = 2560, 1440 # Target resolution (Quad HD)
#RES_W, RES_H = 1920 * 2, 1080 * 2 # Target resolution (4K)
ERROR_IMAGE = base64.b64decode("""
/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAAMCAgMCAgMDAwMEAwMEBQgFBQQEBQoHBwYIDAoMDAsK
CwsNDhIQDQ4RDgsLEBYQERMUFRUVDA8XGBYUGBIUFRT/wAALCABAAEABAREA/8QAGQABAAMBAQAA
AAAAAAAAAAAAAAcICQYE/8QALhAAAQIFAgMHBAMAAAAAAAAAAQIDAAQFBhEHEggJIRMUIjFBUWEj
MkKRcXKi/9oACAEBAAA/AMqoQjqrU0ova+0Fdt2hXbgQOm6l016YH+EmPRdmjGoFhSvebmse4rel
847aqUp+WR+1pAjjYQhCLn8sam6Tt6gXlcerjNDVRaDSkTUs9X9qmG3i6ACltXRxZHknCjnyGYt1
qFzmdNLLmjTLBsao3RJS300TDrqKZLqSPLs07Fr2/wBkJPxEi8MHM3084obpaset269aVfqSVNys
pPOom5Sc6EloObU+IgHwqQAfIEkgGo3Na4Lbe0YfpWpljU9ukUGsTZkqlSpdO1mWmilS0ONJHRCF
hCwUjABSMfdgZ1QhCGTCLHcv3Ru49XeKGyFUSXeEjb9Tlq1Up9KT2csww6lzClehWUhCR6lXsDjR
XnM6i0mi8PlDtBx5tyt1qrtzDMvnxJYZSordx6DcpCfncfYxi5CEIRIWgeiNwcQ+qdFsa2209+qD
n1JhwHs5VlPVx5ePxSnJ+TgDqRG1NXqmk3K14cmmJZkTVSfBDLOUiers6E9VrP4oGRk/ahJAAJIC
sV9eddrq4itR6jeV3TneahMnYyw3kMyjIJ2MtJz4UJyfkkkkkkkx5CEIRpHySaPLP6uahVNbaVTU
tRWpdtZHVKXHgVAfyW0/qI75v9yz9X4tnqbMTC1ydKpEozLMlXhbCwXFED3JX1PwPYRSCEIQhE6c
K9/68af1WvTWhknXJqefZabqZotCTVClvcot7wppzZk7sHAzg+0cxxFXRqheWpczVdX5apyt5vS7
QdRVqYKe8Wkp2tnsQ2gAYHQ7euPWIyhCEIRozydNU7M0xr+pzl4XbQ7VbnJaQTLLrVRZlA8Uqe3B
BcUN2MjOPLI94iTmm3xbmoXFRM1a169TLjpZpEo0J2kzbcyyVgKynegkZGeozFQYQhCEIQhCP//Z
""")
################################################################################
# COMMAND LINE ARGUMENT PARSING #
################################################################################
def check_subsampling(s):
valid = ["4:4:4", "4:2:2", "4:1:1", "4:2:0"]
if not s in valid:
raise argparse.ArgumentTypeError("subsampling must be one of {" + ", ".join(map(str, valid)) + "}")
return s
def pos_int(i):
i = int(i)
if i <= 0:
raise argparse.ArgumentTypeError("all integers must be strictly positive (> 0)")
return i
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('source_dir', help='source directory')
parser.add_argument('target_dir', help='target directory')
parser.add_argument('--width', help='target image width in pixels', type=pos_int, default=RES_W)
parser.add_argument('--height', help='target image height in pixels', type=pos_int, default=RES_H)
parser.add_argument('--quality', help='JPEG quality setting', type=pos_int, default=JPEG_QUALITY)
parser.add_argument('--subsampling', help='JPEG chroma subsampling (e.g., 4:4:4, 4:2:2, 4:1:1)', type=check_subsampling, default=JPEG_SUBSAMPLING)
parser.add_argument('--suffix', help="Suffix that is going to be attached to files", type=str, default="small")
args = parser.parse_args()
# Make sure that the source and target directories are directories
source_dir, target_dir = args.source_dir, args.target_dir
if not os.path.isdir(source_dir):
logger.error("Given source directory \"{}\" does not exist or is not a directory!".format(source_dir))
sys.exit(1)
if os.path.exists(target_dir) and (not os.path.isdir(target_dir)):
logger.error("Given target directory \"{}\" does not exist or is not a directory!".format(target_dir))
sys.exit(1)
os.makedirs(target_dir, exist_ok=True)
# Canonicalise the paths
source_dir = os.path.realpath(source_dir)
target_dir = os.path.realpath(target_dir)
# Make sure target_dir is not inside source_dir
if (target_dir + '/').startswith(source_dir + '/'):
logger.error("Target directory is part of the source directory!")
sys.exit(1)
################################################################################
# SOURCE DIRECTORY SCANNING #
################################################################################
# Compute the target filename suffix
target_suffix = ""
if args.suffix:
target_suffix = "_" + args.suffix
target_suffix += ".jpg"
# Walk the root directory
logger.info("Scanning for new files.")
existing_source_files = []
files_to_be_processed = []
for root, dirs, files in os.walk(source_dir):
for file in sorted(files):
# Compute the source filename
source_file = os.path.join(root, file)
source_file_ext = os.path.splitext(source_file)[1].lower()
if not source_file_ext in FILE_EXTS:
continue
existing_source_files.append(os.path.splitext(source_file)[0])
# Compute the target filename
target_file = os.path.join(target_dir, os.path.relpath(source_file, source_dir))
target_file = os.path.splitext(target_file)[0] + target_suffix
# Check whether the target file already exists, and if yes, whether the
# source file has been updated
if not os.path.exists(target_file):
files_to_be_processed.append((source_file, target_file))
elif os.path.isfile(target_file):
source_stat = os.stat(source_file)
target_stat = os.stat(target_file)
if target_stat.st_size == len(ERROR_IMAGE):
with open(target_file, 'rb') as f:
if ERROR_IMAGE == f.read():
files_to_be_processed.append((source_file, target_file))
if source_stat.st_mtime > target_stat.st_mtime:
files_to_be_processed.append((source_file, target_file))
################################################################################
# TARGET DIRECTORY SCANNING #
################################################################################
# Make sure we can quickly search the source files
existing_source_files = set(existing_source_files)
for root, dirs, files in os.walk(target_dir):
for file in sorted(files):
# Skip all files that do not end with the target suffix
target_file = os.path.join(root, file)
if not target_file.endswith(target_suffix):
continue
# Compute the corresponding source filename
source_file = os.path.join(source_dir, os.path.relpath(target_file, target_dir))
source_file = source_file[:len(source_file) - len(target_suffix)]
# If the file has been deleted in the source directory, delete it in the
# target directory
if not (source_file in existing_source_files):
logger.info("Removing file {}".format(target_file))
os.remove(target_file)
################################################################################
# IMAGE PROCESSING #
################################################################################
def random_string(strlen=6):
"""
Function used to generate a random string as part of the temporary filename
the image is written to.
"""
letters = "0123456789abcdefghijklmnopqrstuvwxyz"
return ''.join(random.choice(letters) for i in range(strlen))
def compute_resize_to_area(w, h, A):
"""
For an image of width w, h returns the width and height of an image with
area A but the same ratio as the original image.
"""
R = w / h
return int(math.sqrt(A * R)), int(math.sqrt(A / R))
def compute_resize_fit(w, h, rw, rh):
"""
For an image of width w, h returns the width and height of an image that
fits into an rectangle of size rw, rh.
"""
# Rescale the longest side of the source image to the length of the
# corresponding target rectangle side
if w >= h:
sw, sh = rw, (rw * (h / w))
else:
sw, sh = (rh * (w / h)), rh
# Rescale the image such that none of the sides is larger than the target
# rectangle
if sw > rw:
sw, sh = sw * (rw / sw), sh * (rw / sw)
if sh > rh:
sw, sh = sw * (rh / sh), sh * (rh / sh)
return int(round(sw)), int(round(sh))
def exec_process(args):
"""
Executes a child process and exits if the child process fails
"""
# Open the child process with the given arguments
child = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Fetch stderr and stdout
stdout, stderr = child.communicate()
# Raise an exception in case the return code is not zero
if child.returncode != 0:
raise Exception("Error while executing subprocess\n" + stderr)
return str(stdout, "ascii")
def read_image_size(image_file):
return list(map(int, exec_process(["identify", "-ping", "-format", "%w:%h", image_file]).split(':')))
def resize_image(source_file, target_file, rw, rh, quality, subsampling):
"""
Resizes the given image such that it fits the specified rectangle rw, rh.
"""
# Read the image size
w, h = read_image_size(source_file)
# Do nothing, if the number of pixels in the source image is smaller than
# the number of pixels in the target image
if w < rw and h < rh:
tar_w, tar_h = w, h
else:
# Compute width/height for the given target area
# tar_w, tar_h = compute_resize_to_area(w, h, rw * rh)
tar_w, tar_h = compute_resize_fit(w, h, rw, rh)
# Resize the image
exec_process(["convert",
"-limit", "thread", "1",
source_file,
"-filter", "Lanczos2Sharp",
"-resize", str(tar_w) + "x" + str(tar_h),
"-quality", str(quality),
"-sampling-factor", str(subsampling),
target_file])
def process_file(pair):
"""
Processes a single file.
"""
# Fetch the source and target file
source_file, target_file = pair
# Create the target directory
target_file_dir = os.path.dirname(target_file)
os.makedirs(target_file_dir, exist_ok=True)
# Assemble a temporary target filename
target_tmp_file = os.path.join(target_file_dir, '.tmp_' + random_string() + '_' + os.path.basename(target_file))
# Try to load the source and resize it
try:
try:
resize_image(source_file, target_tmp_file,
rw=args.width,
rh=args.height,
quality=args.quality,
subsampling=args.subsampling)
except Exception as e:
logger.error(str(e))
# Print a default "error" image
with open(target_tmp_file, 'wb') as f:
f.write(ERROR_IMAGE)
# Move the temporary file to the target file
os.rename(target_tmp_file, target_file)
finally:
# Make sure the temporary file is deleted
try:
os.remove(target_tmp_file)
except OSError:
pass
return None
# Print the total number of files
n_total = len(files_to_be_processed)
if n_total == 0:
logger.info("No new files found, done.")
else:
logger.info("Processing {} new files.".format(len(files_to_be_processed)))
# Distribute the processing to all CPU cores
pool = multiprocessing.Pool()
for i, _ in enumerate(pool.imap_unordered(process_file, files_to_be_processed)):
if (i + 1) % 100 == 0:
logger.info("Processed {} of {} files.".format(i + 1, n_total))
logger.info("Done.")
# Delete empty directories in the target folder
def remove_empty_dirs(dir_, del_top_level=False):
files = os.listdir(dir_)
if len(files) > 0:
for file in files:
new_dir_ = os.path.join(dir_, file)
if os.path.isdir(new_dir_):
remove_empty_dirs(new_dir_, True)
elif del_top_level:
logger.info("Removing empty directory {}".format(dir_))
os.rmdir(dir_)
remove_empty_dirs(target_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment