Skip to content

Instantly share code, notes, and snippets.

@aheadley
Last active August 24, 2019 01:19
Show Gist options
  • Save aheadley/6f0e6974c967a96aff97d63699f6e1ca to your computer and use it in GitHub Desktop.
Save aheadley/6f0e6974c967a96aff97d63699f6e1ca to your computer and use it in GitHub Desktop.
Python photo importer
#!/usr/bin/env python3
import datetime
import errno
import hashlib
import io
import os
import os.path
import shutil
import struct
import tempfile
import time
import exifread
from loguru import logger
FILE_EXTS = {
'jpg': ['jpg', 'jpeg'],
'nef': ['nef'],
'dng': ['dng'],
'rw2': ['rw2'],
}
EXIF_TIMESTAMP_TAG = 'EXIF DateTimeOriginal'
EXIF_TIMEZONE_TAG = 'MakerNote WorldTime'
EXIF_TIMESTAMP_FMT = '%Y:%m:%d %H:%M:%S'
CHUNK_SIZE = 8 * 1024
def read_chunks(buf, chunk_size=CHUNK_SIZE):
total_bytes = 0
c = buf.read(chunk_size)
while c:
total_bytes += len(c)
# logger.debug('Read {:d}/{:d} bytes', len(c), total_bytes)
yield c
c = buf.read(chunk_size)
logger.debug('Read a total of: {:d} bytes', total_bytes)
def mkdir_p(target_dir):
try:
os.makedirs(target_dir)
except OSError as err:
if err.errno != errno.EEXIST:
raise err
def scan_src_dir(src_dir):
file_exts = ['.' + ext.lower() \
for ext_list in FILE_EXTS.values() \
for ext in ext_list]
for (path, dir_names, file_names) in os.walk(src_dir):
logger.info('Checking path for image files: {}', path)
for fn in file_names:
_, ext = os.path.splitext(fn)
if ext.lower() in file_exts:
src_fn = os.path.abspath(os.path.join(path, fn))
logger.debug('Yielding photo file: {}', src_fn)
yield src_fn
def normalize_ext(src_photo_fn):
_, ext = os.path.splitext(src_photo_fn)
ext = ext.lstrip('.').lower()
for dest_ext, ext_list in FILE_EXTS.items():
if ext in ext_list:
return dest_ext
else:
raise KeyError(ext)
def get_photo_timezone_offset(photo_tags):
try:
# https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Nikon.html
FMT = 'h?B'
# try little endian first
offset_in_minutes, dst, _ = struct.unpack('<' + FMT, bytes(photo_tags[EXIF_TIMEZONE_TAG].values))
if abs(offset_in_minutes) > 1440:
# this can't be right, try big endian
offset_in_minutes, dst, _ = struct.unpack('>' + FMT, bytes(photo_tags[EXIF_TIMEZONE_TAG].values))
if abs(offset_in_minutes) > 1440:
# still something wrong, bail
raise ValueError('Could not determine valid TZ offset')
td = datetime.timedelta(minutes=offset_in_minutes)
logger.debug('Using EXIF datetime offset: {}', td)
except (KeyError, AttributeError, ValueError):
td = datetime.timedelta()
return td
def get_photo_timestamp(src_photo):
with open(src_photo, 'rb') as f:
photo_tags = exifread.process_file(f)
try:
dt = datetime.datetime.strptime(str(photo_tags[EXIF_TIMESTAMP_TAG]), EXIF_TIMESTAMP_FMT)
# TODO: verify this is correct
# dt += get_photo_timezone_offset(photo_tags)
logger.debug('Using EXIF datetime: {}', dt)
except KeyError:
# no exif tag available, use older of mtime, ctime
dt = datetime.datetime.fromtimestamp(
min(f(src_photo) for f in (os.path.getctime, os.path.getmtime)))
logger.debug('Using filesystem meta datetime: {}', dt)
return dt
def import_photo(src_photo_fn, dest_dir):
timestamp = get_photo_timestamp(src_photo_fn)
timestamp_unix = int(time.mktime(timestamp.timetuple()))
dest_date_dir = timestamp.strftime('%Y-%m-%d')
full_dest_dir = os.path.join(dest_dir, dest_date_dir)
dest_photo_ext = normalize_ext(src_photo_fn)
photo_hash = hashlib.sha1()
logger.info('Importing file: {}', src_photo_fn)
with tempfile.NamedTemporaryFile(mode='wb', dir=dest_dir, prefix='.import-', delete=False) as tmp_file:
tmp_file_fn = tmp_file.name
logger.debug('Writing to temporary file: {}', tmp_file_fn)
with io.BufferedReader(open(src_photo_fn, 'rb')) as src_photo_handle:
for chunk in read_chunks(src_photo_handle):
photo_hash.update(chunk)
tmp_file.write(chunk)
dest_photo_fn = '{:d}-{}.{}'.format(timestamp_unix, photo_hash.hexdigest(), dest_photo_ext)
dest_fn = os.path.join(full_dest_dir, dest_photo_fn)
if not os.path.exists(full_dest_dir):
logger.info('Creating destination dir: {}', full_dest_dir)
mkdir_p(full_dest_dir)
try:
if os.path.exists(dest_fn):
logger.info('Destination file exists, skipping copy')
src_size = os.path.getsize(tmp_file_fn)
dest_size = os.path.getsize(dest_fn)
if src_size != dest_size:
logger.warning('Destination file exists but filesize does not match: src={:d} dest={:d}',
src_size, dest_size)
logger.debug('Removing temp file: {}', tmp_file_fn)
os.remove(tmp_file_fn)
else:
logger.info('Copying photo: {} -> {}', src_photo_fn, dest_fn)
os.rename(tmp_file_fn, dest_fn)
shutil.copystat(src_photo_fn, dest_fn)
logger.debug('Updating atime/mtime to: {:d}', timestamp_unix)
os.utime(dest_fn, (timestamp_unix, timestamp_unix))
logger.info('Imported file to: {}', dest_fn)
except OSError as err:
logger.error('Failed to import file: {}', src_photo_fn)
logger.exception(err)
def main(args):
access = lambda path, perms: os.access(path, perms, effective_ids=os.access in os.supports_effective_ids)
if not access(args.dest_dir, os.W_OK):
logger.error('Unable to write to destination dir: {}', args.dest_dir)
elif not access(args.source_dir, os.R_OK|os.X_OK):
logger.error('Unable to read from source dir: {}', args.source_dir)
else:
for src_photo in scan_src_dir(args.source_dir):
import_photo(src_photo, args.dest_dir)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('source_dir')
parser.add_argument('dest_dir')
args = parser.parse_args()
main(args)
ExifRead==2.1.2
loguru==0.3.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment