Created
May 2, 2020 15:36
-
-
Save terrettaz/53528e26cf9f7b19125af286e780dc9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
import sys | |
import os | |
import popen2 | |
import re | |
import shutil | |
import pytz | |
import imghdr | |
from datetime import datetime | |
from dateutil import parser | |
import json | |
import pprint | |
ALBUMS_DIR = '/Users/pik/Pictures/Photos/AlbumsJournaliers' | |
RESULT_DIR = '/Users/pik/Pictures/Photos/DailyPictures' | |
DATA = '/Users/pik/.daily_pictures.json' | |
LOCAL_TZ = pytz.timezone ("Europe/Zurich") | |
def test_image(path): | |
try: | |
return imghdr.what(path) | |
except Exception as e: | |
print 'skipping ' + path | |
return None | |
def parse_tz_datetime(text, message, path): | |
if text in ['0000:00:00 00:00:00', '-']: | |
return None | |
try: | |
dt = parser.parse(text) | |
if dt.tzinfo: | |
return dt.astimezone(LOCAL_TZ) | |
return dt | |
except Exception as e: | |
print 'Cannot parse "%s" date from path %s: %s' % (text, path, message) | |
def get_dates(path): | |
r, w, e = popen2.popen3( | |
'xargs exiftool -f -s -s -s -d \'%Y%m%d%H%M%S%z\' -FileModifyDate -DateTimeOriginal') | |
try: | |
escaped_path = re.sub(r' ', "\\ ", path) | |
w.write(escaped_path.encode('utf-8')+'\n') | |
except Exception as e: | |
print 'Error while executing exiftool ' + e | |
raise e | |
finally: | |
w.close() | |
lines = r.readlines() | |
return [ | |
('file_date', parse_tz_datetime(lines[0][:-1], 'file date', path)), | |
('exif_date', parse_tz_datetime(lines[1][:-1], 'exif date', path)) | |
] | |
def guess_date(path): | |
dirname = os.path.dirname(path) | |
if dirname != '': | |
m = re.match('^.*(?P<day>[0-3][0-9])\.(?P<month>[0-1][0-9])\.(?P<year>[0-1][0-9]).*$', dirname) | |
if m != None: | |
year = int(m.group('year')) | |
if year < 80: | |
year = 2000 + year | |
else: | |
year = 1900 + year | |
try: | |
return datetime(year, int(m.group('month')), int(m.group('day'))) | |
except Exception as e: | |
print 'Warning: cannot create date with %s-%s-%s' % (m.group('year'), m.group('month'), m.group('day')) | |
print ' for file %(path)u %(e)u ' % locals() | |
raise e | |
def get_file_info(path): | |
info = { | |
'path':path, | |
'filename':os.path.basename(path) | |
} | |
try: | |
dates = filter(lambda pair: pair[1], | |
[('guess_date', guess_date(path))] + get_dates(path)) | |
return dict(info.items() + dates) | |
except Exception as e: | |
print 'Error while processing "%(path)s" %(info)s %(e)s' % locals() | |
raise e | |
def select_best_date(info): | |
path = info['path'] | |
exif = info['exif_date'] if 'exif_date' in info else None | |
file = info['file_date'] if 'file_date' in info else None | |
guess = info['guess_date'] if 'guess_date' in info else None | |
if 'selected_date' in info: | |
return info | |
if exif and guess: | |
if exif.year != guess.year and \ | |
exif.month != guess.month and \ | |
exif.day != guess.day: | |
choice = 0 | |
array = (exif, guess, file) | |
while True: | |
print '%(path)s as two diffrent dates, choose:' % locals() | |
print ' [1] exif %(exif)s' % locals() | |
print ' [2] guess %(guess)s' % locals() | |
print ' [3] file %(file)s' % locals() | |
print ' [s] Skip' | |
print ' [q] Quit' | |
choice = sys.stdin.readline().lower()[:-1] | |
if choice == 'q': | |
sys.exit(1) | |
elif choice == 's': | |
return info | |
if int(choice) > 3 or int(choice) < 1: | |
continue | |
return dict(info.items() + [('selected_date', array[int(choice)-1])]) | |
else: | |
return dict(info.items() + [('selected_date', exif)]) | |
elif exif == None and guess == None: | |
while True: | |
try: | |
print '%(path)s cannot find a valid date, choose one: yyyy-mm-dd or "s" to skip' % locals() | |
strdate = sys.stdin.readline().lower()[:-1] | |
if strdate == 's': | |
return info | |
return dict(info.items() + [('selected_date', datetime.strptime(strdate, '%Y-%m-%d'))]) | |
except: | |
continue | |
elif exif == None: | |
return dict(info.items() + [('selected_date', guess)]) | |
else: | |
return dict(info.items() + [('selected_date', exif)]) | |
def load_data(): | |
if not os.path.exists(DATA): return [] | |
f = open(DATA) | |
try: | |
data = json.loads(f.read()) | |
return map(lambda x: parse_dates(x), data) | |
except Exception as e: | |
print e | |
print 'Cannot load data from %s' % DATA | |
return [] | |
finally: | |
f.close() | |
def format_dates(dict_from): | |
d = dict(dict_from) | |
d.update(dict( | |
filter(lambda pair: pair[1], | |
map(lambda key: | |
(key, d[key].strftime('%Y%m%d%H%M%S') if key in d else None), | |
['exif_date', 'guess_date', 'file_date', 'selected_date'])))) | |
return d | |
def parse_dates(dict_from): | |
d = dict(dict_from) | |
d.update(dict( | |
filter(lambda pair: pair[1], | |
map(lambda key: | |
(key, datetime.strptime(d[key], '%Y%m%d%H%M%S') if key in d else None), | |
['exif_date', 'guess_date', 'file_date', 'selected_date'])))) | |
return d | |
def save_data(data): | |
print 'saving data .. ', | |
f = open(DATA, 'w') | |
try: | |
data = map(lambda x: format_dates(x), data) | |
f.write(json.dumps(data)) | |
finally: | |
f.close | |
'ok' | |
def find_by_path(path, file_infos): | |
infos = filter(lambda x: x['path'] == path, file_infos) | |
if len(infos) > 0: | |
return infos[0] | |
return None | |
def load_new_paths(file_infos): | |
print 'loading files .. ', | |
paths = [] | |
for root, dirs, files in os.walk(ALBUMS_DIR): | |
for f in files: | |
path = os.path.join(root, f).decode('utf-8') | |
info = find_by_path(path, file_infos) | |
if info or (info != None and 'error' in info): | |
continue | |
if not test_image(path): | |
continue | |
paths.append(path) | |
print 'ok' | |
return paths | |
def load_file_infos(paths): | |
from multiprocessing import Pool | |
print 'loading infos .. ', | |
pool = Pool(16) | |
files_info = [] | |
try: | |
for value in pool.imap(get_file_info, paths): | |
if value: # Can be None | |
files_info.append(value) | |
finally: | |
pool.terminate() | |
pool.join() | |
print 'ok' | |
return files_info | |
def select_dates(files_info): | |
print 'Selecting dates ..' | |
return map(select_best_date, files_info) | |
def copy_files(files_info, dryRun): | |
count_dir = 0 | |
count_files = 0 | |
for info in files_info: | |
if 'error' in info: | |
continue | |
date = info['selected_date'] | |
if date == None: | |
print 'Skipped %s' % info['filename'] | |
info['error'] = True | |
continue | |
new_dir = os.path.join( | |
RESULT_DIR, | |
str(date.year), | |
date.strftime('%m'), | |
date.strftime('%y.%m.%d')) | |
if not os.path.exists(new_dir): | |
print 'creating dir %s' % new_dir | |
if not dryRun: | |
os.makedirs(new_dir) | |
count_dir += 1 | |
if not os.path.exists(os.path.join(new_dir, os.path.basename(info['path']))): | |
print 'copying %s' % info['path'] | |
if not dryRun: | |
shutil.copy2(info['path'], new_dir) | |
count_files += 1 | |
return (count_dir, count_files) | |
def main(argv): | |
files_info = load_data() | |
try: | |
paths = load_new_paths(files_info) | |
files_info+=load_file_infos(paths) | |
save_data(files_info) | |
files_info = select_dates(files_info) | |
save_data(files_info) | |
count_dir, count_files = copy_files(files_info, '--dryRun' in argv) | |
result = 'finished, %(count_dir)d directories created, %(count_files)d files copied' % locals() | |
print result | |
except KeyboardInterrupt: pass | |
finally: | |
save_data(files_info) | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment