Last active
April 27, 2019 17:08
-
-
Save astoeckel/d655bb4c0d922356b03f86a5e5a6a133 to your computer and use it in GitHub Desktop.
Mirrors a source directory containing images to a target directory and shrinks the images in the process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
################################################################################ | |
# imgshrink.py # | |
# Mirrors a directory containing photos in smaller resolution # | |
# (c) 2019 Andreas Stöckel, licensed under the GPLv3 # | |
################################################################################ | |
import math | |
import multiprocessing | |
import os | |
import random | |
import sys | |
import subprocess | |
import tempfile | |
import base64 | |
# Initialize the logger | |
import logging | |
logging.basicConfig(format='[%(levelname)s] %(asctime)s: %(message)s', level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
################################################################################ | |
# DEFAULT PARAMETERS # | |
################################################################################ | |
# Supported file extensions | |
FILE_EXTS = [".png", ".jpg"] | |
JPEG_QUALITY = 85 # JPEG Quality | |
JPEG_SUBSAMPLING = "4:4:4" # JPEG Subsampling | |
RES_W, RES_H = 1920, 1080 # Target resolution (Full HD) | |
#RES_W, RES_H = 2560, 1440 # Target resolution (Quad HD) | |
#RES_W, RES_H = 1920 * 2, 1080 * 2 # Target resolution (4K) | |
ERROR_IMAGE = base64.b64decode(""" | |
/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAAMCAgMCAgMDAwMEAwMEBQgFBQQEBQoHBwYIDAoMDAsK | |
CwsNDhIQDQ4RDgsLEBYQERMUFRUVDA8XGBYUGBIUFRT/wAALCABAAEABAREA/8QAGQABAAMBAQAA | |
AAAAAAAAAAAAAAcICQYE/8QALhAAAQIFAgMHBAMAAAAAAAAAAQIDAAQFBhEHEggJIRMUIjFBUWEj | |
MkKRcXKi/9oACAEBAAA/AMqoQjqrU0ova+0Fdt2hXbgQOm6l016YH+EmPRdmjGoFhSvebmse4rel | |
847aqUp+WR+1pAjjYQhCLn8sam6Tt6gXlcerjNDVRaDSkTUs9X9qmG3i6ACltXRxZHknCjnyGYt1 | |
qFzmdNLLmjTLBsao3RJS300TDrqKZLqSPLs07Fr2/wBkJPxEi8MHM3084obpaset269aVfqSVNys | |
pPOom5Sc6EloObU+IgHwqQAfIEkgGo3Na4Lbe0YfpWpljU9ukUGsTZkqlSpdO1mWmilS0ONJHRCF | |
hCwUjABSMfdgZ1QhCGTCLHcv3Ru49XeKGyFUSXeEjb9Tlq1Up9KT2csww6lzClehWUhCR6lXsDjR | |
XnM6i0mi8PlDtBx5tyt1qrtzDMvnxJYZSordx6DcpCfncfYxi5CEIRIWgeiNwcQ+qdFsa2209+qD | |
n1JhwHs5VlPVx5ePxSnJ+TgDqRG1NXqmk3K14cmmJZkTVSfBDLOUiers6E9VrP4oGRk/ahJAAJIC | |
sV9eddrq4itR6jeV3TneahMnYyw3kMyjIJ2MtJz4UJyfkkkkkkkx5CEIRpHySaPLP6uahVNbaVTU | |
tRWpdtZHVKXHgVAfyW0/qI75v9yz9X4tnqbMTC1ydKpEozLMlXhbCwXFED3JX1PwPYRSCEIQhE6c | |
K9/68af1WvTWhknXJqefZabqZotCTVClvcot7wppzZk7sHAzg+0cxxFXRqheWpczVdX5apyt5vS7 | |
QdRVqYKe8Wkp2tnsQ2gAYHQ7euPWIyhCEIRozydNU7M0xr+pzl4XbQ7VbnJaQTLLrVRZlA8Uqe3B | |
BcUN2MjOPLI94iTmm3xbmoXFRM1a169TLjpZpEo0J2kzbcyyVgKynegkZGeozFQYQhCEIQhCP//Z | |
""") | |
################################################################################ | |
# COMMAND LINE ARGUMENT PARSING # | |
################################################################################ | |
def check_subsampling(s): | |
valid = ["4:4:4", "4:2:2", "4:1:1", "4:2:0"] | |
if not s in valid: | |
raise argparse.ArgumentTypeError("subsampling must be one of {" + ", ".join(map(str, valid)) + "}") | |
return s | |
def pos_int(i): | |
i = int(i) | |
if i <= 0: | |
raise argparse.ArgumentTypeError("all integers must be strictly positive (> 0)") | |
return i | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('source_dir', help='source directory') | |
parser.add_argument('target_dir', help='target directory') | |
parser.add_argument('--width', help='target image width in pixels', type=pos_int, default=RES_W) | |
parser.add_argument('--height', help='target image height in pixels', type=pos_int, default=RES_H) | |
parser.add_argument('--quality', help='JPEG quality setting', type=pos_int, default=JPEG_QUALITY) | |
parser.add_argument('--subsampling', help='JPEG chroma subsampling (e.g., 4:4:4, 4:2:2, 4:1:1)', type=check_subsampling, default=JPEG_SUBSAMPLING) | |
parser.add_argument('--suffix', help="Suffix that is going to be attached to files", type=str, default="small") | |
args = parser.parse_args() | |
# Make sure that the source and target directories are directories | |
source_dir, target_dir = args.source_dir, args.target_dir | |
if not os.path.isdir(source_dir): | |
logger.error("Given source directory \"{}\" does not exist or is not a directory!".format(source_dir)) | |
sys.exit(1) | |
if os.path.exists(target_dir) and (not os.path.isdir(target_dir)): | |
logger.error("Given target directory \"{}\" does not exist or is not a directory!".format(target_dir)) | |
sys.exit(1) | |
os.makedirs(target_dir, exist_ok=True) | |
# Canonicalise the paths | |
source_dir = os.path.realpath(source_dir) | |
target_dir = os.path.realpath(target_dir) | |
# Make sure target_dir is not inside source_dir | |
if (target_dir + '/').startswith(source_dir + '/'): | |
logger.error("Target directory is part of the source directory!") | |
sys.exit(1) | |
################################################################################ | |
# SOURCE DIRECTORY SCANNING # | |
################################################################################ | |
# Compute the target filename suffix | |
target_suffix = "" | |
if args.suffix: | |
target_suffix = "_" + args.suffix | |
target_suffix += ".jpg" | |
# Walk the root directory | |
logger.info("Scanning for new files.") | |
existing_source_files = [] | |
files_to_be_processed = [] | |
for root, dirs, files in os.walk(source_dir): | |
for file in sorted(files): | |
# Compute the source filename | |
source_file = os.path.join(root, file) | |
source_file_ext = os.path.splitext(source_file)[1].lower() | |
if not source_file_ext in FILE_EXTS: | |
continue | |
existing_source_files.append(os.path.splitext(source_file)[0]) | |
# Compute the target filename | |
target_file = os.path.join(target_dir, os.path.relpath(source_file, source_dir)) | |
target_file = os.path.splitext(target_file)[0] + target_suffix | |
# Check whether the target file already exists, and if yes, whether the | |
# source file has been updated | |
if not os.path.exists(target_file): | |
files_to_be_processed.append((source_file, target_file)) | |
elif os.path.isfile(target_file): | |
source_stat = os.stat(source_file) | |
target_stat = os.stat(target_file) | |
if target_stat.st_size == len(ERROR_IMAGE): | |
with open(target_file, 'rb') as f: | |
if ERROR_IMAGE == f.read(): | |
files_to_be_processed.append((source_file, target_file)) | |
if source_stat.st_mtime > target_stat.st_mtime: | |
files_to_be_processed.append((source_file, target_file)) | |
################################################################################ | |
# TARGET DIRECTORY SCANNING # | |
################################################################################ | |
# Make sure we can quickly search the source files | |
existing_source_files = set(existing_source_files) | |
for root, dirs, files in os.walk(target_dir): | |
for file in sorted(files): | |
# Skip all files that do not end with the target suffix | |
target_file = os.path.join(root, file) | |
if not target_file.endswith(target_suffix): | |
continue | |
# Compute the corresponding source filename | |
source_file = os.path.join(source_dir, os.path.relpath(target_file, target_dir)) | |
source_file = source_file[:len(source_file) - len(target_suffix)] | |
# If the file has been deleted in the source directory, delete it in the | |
# target directory | |
if not (source_file in existing_source_files): | |
logger.info("Removing file {}".format(target_file)) | |
os.remove(target_file) | |
################################################################################ | |
# IMAGE PROCESSING # | |
################################################################################ | |
def random_string(strlen=6): | |
""" | |
Function used to generate a random string as part of the temporary filename | |
the image is written to. | |
""" | |
letters = "0123456789abcdefghijklmnopqrstuvwxyz" | |
return ''.join(random.choice(letters) for i in range(strlen)) | |
def compute_resize_to_area(w, h, A): | |
""" | |
For an image of width w, h returns the width and height of an image with | |
area A but the same ratio as the original image. | |
""" | |
R = w / h | |
return int(math.sqrt(A * R)), int(math.sqrt(A / R)) | |
def compute_resize_fit(w, h, rw, rh): | |
""" | |
For an image of width w, h returns the width and height of an image that | |
fits into an rectangle of size rw, rh. | |
""" | |
# Rescale the longest side of the source image to the length of the | |
# corresponding target rectangle side | |
if w >= h: | |
sw, sh = rw, (rw * (h / w)) | |
else: | |
sw, sh = (rh * (w / h)), rh | |
# Rescale the image such that none of the sides is larger than the target | |
# rectangle | |
if sw > rw: | |
sw, sh = sw * (rw / sw), sh * (rw / sw) | |
if sh > rh: | |
sw, sh = sw * (rh / sh), sh * (rh / sh) | |
return int(round(sw)), int(round(sh)) | |
def exec_process(args): | |
""" | |
Executes a child process and exits if the child process fails | |
""" | |
# Open the child process with the given arguments | |
child = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# Fetch stderr and stdout | |
stdout, stderr = child.communicate() | |
# Raise an exception in case the return code is not zero | |
if child.returncode != 0: | |
raise Exception("Error while executing subprocess\n" + stderr) | |
return str(stdout, "ascii") | |
def read_image_size(image_file): | |
return list(map(int, exec_process(["identify", "-ping", "-format", "%w:%h", image_file]).split(':'))) | |
def resize_image(source_file, target_file, rw, rh, quality, subsampling): | |
""" | |
Resizes the given image such that it fits the specified rectangle rw, rh. | |
""" | |
# Read the image size | |
w, h = read_image_size(source_file) | |
# Do nothing, if the number of pixels in the source image is smaller than | |
# the number of pixels in the target image | |
if w < rw and h < rh: | |
tar_w, tar_h = w, h | |
else: | |
# Compute width/height for the given target area | |
# tar_w, tar_h = compute_resize_to_area(w, h, rw * rh) | |
tar_w, tar_h = compute_resize_fit(w, h, rw, rh) | |
# Resize the image | |
exec_process(["convert", | |
"-limit", "thread", "1", | |
source_file, | |
"-filter", "Lanczos2Sharp", | |
"-resize", str(tar_w) + "x" + str(tar_h), | |
"-quality", str(quality), | |
"-sampling-factor", str(subsampling), | |
target_file]) | |
def process_file(pair): | |
""" | |
Processes a single file. | |
""" | |
# Fetch the source and target file | |
source_file, target_file = pair | |
# Create the target directory | |
target_file_dir = os.path.dirname(target_file) | |
os.makedirs(target_file_dir, exist_ok=True) | |
# Assemble a temporary target filename | |
target_tmp_file = os.path.join(target_file_dir, '.tmp_' + random_string() + '_' + os.path.basename(target_file)) | |
# Try to load the source and resize it | |
try: | |
try: | |
resize_image(source_file, target_tmp_file, | |
rw=args.width, | |
rh=args.height, | |
quality=args.quality, | |
subsampling=args.subsampling) | |
except Exception as e: | |
logger.error(str(e)) | |
# Print a default "error" image | |
with open(target_tmp_file, 'wb') as f: | |
f.write(ERROR_IMAGE) | |
# Move the temporary file to the target file | |
os.rename(target_tmp_file, target_file) | |
finally: | |
# Make sure the temporary file is deleted | |
try: | |
os.remove(target_tmp_file) | |
except OSError: | |
pass | |
return None | |
# Print the total number of files | |
n_total = len(files_to_be_processed) | |
if n_total == 0: | |
logger.info("No new files found, done.") | |
else: | |
logger.info("Processing {} new files.".format(len(files_to_be_processed))) | |
# Distribute the processing to all CPU cores | |
pool = multiprocessing.Pool() | |
for i, _ in enumerate(pool.imap_unordered(process_file, files_to_be_processed)): | |
if (i + 1) % 100 == 0: | |
logger.info("Processed {} of {} files.".format(i + 1, n_total)) | |
logger.info("Done.") | |
# Delete empty directories in the target folder | |
def remove_empty_dirs(dir_, del_top_level=False): | |
files = os.listdir(dir_) | |
if len(files) > 0: | |
for file in files: | |
new_dir_ = os.path.join(dir_, file) | |
if os.path.isdir(new_dir_): | |
remove_empty_dirs(new_dir_, True) | |
elif del_top_level: | |
logger.info("Removing empty directory {}".format(dir_)) | |
os.rmdir(dir_) | |
remove_empty_dirs(target_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment