Last active
August 24, 2019 01:19
-
-
Save aheadley/6f0e6974c967a96aff97d63699f6e1ca to your computer and use it in GitHub Desktop.
Python photo importer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import datetime | |
import errno | |
import hashlib | |
import io | |
import os | |
import os.path | |
import shutil | |
import struct | |
import tempfile | |
import time | |
import exifread | |
from loguru import logger | |
FILE_EXTS = { | |
'jpg': ['jpg', 'jpeg'], | |
'nef': ['nef'], | |
'dng': ['dng'], | |
'rw2': ['rw2'], | |
} | |
EXIF_TIMESTAMP_TAG = 'EXIF DateTimeOriginal' | |
EXIF_TIMEZONE_TAG = 'MakerNote WorldTime' | |
EXIF_TIMESTAMP_FMT = '%Y:%m:%d %H:%M:%S' | |
CHUNK_SIZE = 8 * 1024 | |
def read_chunks(buf, chunk_size=CHUNK_SIZE): | |
total_bytes = 0 | |
c = buf.read(chunk_size) | |
while c: | |
total_bytes += len(c) | |
# logger.debug('Read {:d}/{:d} bytes', len(c), total_bytes) | |
yield c | |
c = buf.read(chunk_size) | |
logger.debug('Read a total of: {:d} bytes', total_bytes) | |
def mkdir_p(target_dir): | |
try: | |
os.makedirs(target_dir) | |
except OSError as err: | |
if err.errno != errno.EEXIST: | |
raise err | |
def scan_src_dir(src_dir): | |
file_exts = ['.' + ext.lower() \ | |
for ext_list in FILE_EXTS.values() \ | |
for ext in ext_list] | |
for (path, dir_names, file_names) in os.walk(src_dir): | |
logger.info('Checking path for image files: {}', path) | |
for fn in file_names: | |
_, ext = os.path.splitext(fn) | |
if ext.lower() in file_exts: | |
src_fn = os.path.abspath(os.path.join(path, fn)) | |
logger.debug('Yielding photo file: {}', src_fn) | |
yield src_fn | |
def normalize_ext(src_photo_fn): | |
_, ext = os.path.splitext(src_photo_fn) | |
ext = ext.lstrip('.').lower() | |
for dest_ext, ext_list in FILE_EXTS.items(): | |
if ext in ext_list: | |
return dest_ext | |
else: | |
raise KeyError(ext) | |
def get_photo_timezone_offset(photo_tags): | |
try: | |
# https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Nikon.html | |
FMT = 'h?B' | |
# try little endian first | |
offset_in_minutes, dst, _ = struct.unpack('<' + FMT, bytes(photo_tags[EXIF_TIMEZONE_TAG].values)) | |
if abs(offset_in_minutes) > 1440: | |
# this can't be right, try big endian | |
offset_in_minutes, dst, _ = struct.unpack('>' + FMT, bytes(photo_tags[EXIF_TIMEZONE_TAG].values)) | |
if abs(offset_in_minutes) > 1440: | |
# still something wrong, bail | |
raise ValueError('Could not determine valid TZ offset') | |
td = datetime.timedelta(minutes=offset_in_minutes) | |
logger.debug('Using EXIF datetime offset: {}', td) | |
except (KeyError, AttributeError, ValueError): | |
td = datetime.timedelta() | |
return td | |
def get_photo_timestamp(src_photo): | |
with open(src_photo, 'rb') as f: | |
photo_tags = exifread.process_file(f) | |
try: | |
dt = datetime.datetime.strptime(str(photo_tags[EXIF_TIMESTAMP_TAG]), EXIF_TIMESTAMP_FMT) | |
# TODO: verify this is correct | |
# dt += get_photo_timezone_offset(photo_tags) | |
logger.debug('Using EXIF datetime: {}', dt) | |
except KeyError: | |
# no exif tag available, use older of mtime, ctime | |
dt = datetime.datetime.fromtimestamp( | |
min(f(src_photo) for f in (os.path.getctime, os.path.getmtime))) | |
logger.debug('Using filesystem meta datetime: {}', dt) | |
return dt | |
def import_photo(src_photo_fn, dest_dir): | |
timestamp = get_photo_timestamp(src_photo_fn) | |
timestamp_unix = int(time.mktime(timestamp.timetuple())) | |
dest_date_dir = timestamp.strftime('%Y-%m-%d') | |
full_dest_dir = os.path.join(dest_dir, dest_date_dir) | |
dest_photo_ext = normalize_ext(src_photo_fn) | |
photo_hash = hashlib.sha1() | |
logger.info('Importing file: {}', src_photo_fn) | |
with tempfile.NamedTemporaryFile(mode='wb', dir=dest_dir, prefix='.import-', delete=False) as tmp_file: | |
tmp_file_fn = tmp_file.name | |
logger.debug('Writing to temporary file: {}', tmp_file_fn) | |
with io.BufferedReader(open(src_photo_fn, 'rb')) as src_photo_handle: | |
for chunk in read_chunks(src_photo_handle): | |
photo_hash.update(chunk) | |
tmp_file.write(chunk) | |
dest_photo_fn = '{:d}-{}.{}'.format(timestamp_unix, photo_hash.hexdigest(), dest_photo_ext) | |
dest_fn = os.path.join(full_dest_dir, dest_photo_fn) | |
if not os.path.exists(full_dest_dir): | |
logger.info('Creating destination dir: {}', full_dest_dir) | |
mkdir_p(full_dest_dir) | |
try: | |
if os.path.exists(dest_fn): | |
logger.info('Destination file exists, skipping copy') | |
src_size = os.path.getsize(tmp_file_fn) | |
dest_size = os.path.getsize(dest_fn) | |
if src_size != dest_size: | |
logger.warning('Destination file exists but filesize does not match: src={:d} dest={:d}', | |
src_size, dest_size) | |
logger.debug('Removing temp file: {}', tmp_file_fn) | |
os.remove(tmp_file_fn) | |
else: | |
logger.info('Copying photo: {} -> {}', src_photo_fn, dest_fn) | |
os.rename(tmp_file_fn, dest_fn) | |
shutil.copystat(src_photo_fn, dest_fn) | |
logger.debug('Updating atime/mtime to: {:d}', timestamp_unix) | |
os.utime(dest_fn, (timestamp_unix, timestamp_unix)) | |
logger.info('Imported file to: {}', dest_fn) | |
except OSError as err: | |
logger.error('Failed to import file: {}', src_photo_fn) | |
logger.exception(err) | |
def main(args): | |
access = lambda path, perms: os.access(path, perms, effective_ids=os.access in os.supports_effective_ids) | |
if not access(args.dest_dir, os.W_OK): | |
logger.error('Unable to write to destination dir: {}', args.dest_dir) | |
elif not access(args.source_dir, os.R_OK|os.X_OK): | |
logger.error('Unable to read from source dir: {}', args.source_dir) | |
else: | |
for src_photo in scan_src_dir(args.source_dir): | |
import_photo(src_photo, args.dest_dir) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('source_dir') | |
parser.add_argument('dest_dir') | |
args = parser.parse_args() | |
main(args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ExifRead==2.1.2 | |
loguru==0.3.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment