Created
May 5, 2016 11:57
-
-
Save jacobh/3cb97025a1112e0c51362cd559ff5cda to your computer and use it in GitHub Desktop.
might have another version of this in gist already...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import multiprocessing | |
from multiprocessing.pool import Pool | |
import exifread | |
import arrow | |
import glob | |
import os | |
import records | |
import shutil | |
from PIL import Image | |
LR_CATALOG_PATH = '/Users/jacob/Dropbox/Photos/Lightroom/Lightroom Catalog.lrcat' | |
RESTORE_IMAGES_FOLDER_PATH = '/Volumes/Storage/Photos/Other/Recovered Photos' | |
DESTINATION_PATH = '/Volumes/Storage/Photos/Recovered_LR_Photos' | |
class ImageSet(object): | |
def __init__(self, images): | |
self._images = list(images) | |
def __iter__(self): | |
for image in self._images: | |
yield image | |
def __len__(self): | |
return len(self._images) | |
def _with_unique_attr(self, attr_name, is_callable=False): | |
attr_value_image_map = {} | |
for image in self: | |
value = getattr(image, attr_name) | |
if is_callable: | |
value = value() | |
if value in attr_value_image_map: | |
attr_value_image_map[value] = None | |
else: | |
attr_value_image_map[value] = image | |
return [image for image in attr_value_image_map.values() if image is not None] | |
def with_unique_dimensions(self): | |
return self._with_unique_attr('get_dimensions', True) | |
def with_unique_capture_time(self): | |
return self._with_unique_attr('capture_time') | |
class BaseImage(object): | |
def __init__(self, capture_time, filename, folder_path, width, height): | |
self.capture_time = capture_time | |
self.filename = filename | |
self.folder_path = folder_path | |
self.width = width | |
self.height = height | |
def get_dimensions(self): | |
return (self.width, self.height) | |
def get_path(self): | |
return os.path.join(folder_path, filename) | |
class RestoreImage(BaseImage): | |
@classmethod | |
def get_metadata(cls, file_path): | |
with open(file_path) as f: | |
return exifread.process_file(f) | |
@classmethod | |
def get_capture_time(cls, file_path): | |
metadata = cls.get_metadata(file_path) | |
capture_time_str = metadata.get('EXIF DateTimeOriginal', None) | |
if capture_time_str: | |
if str(capture_time_str) in ['0000:00:00 00:00:00']: | |
return | |
try: | |
return arrow.get(str(capture_time_str), ['YYYY:MM:DD HH:mm:ss', 'YYYY:MM:DD HH:mm:']) | |
except ValueError: | |
pass | |
@classmethod | |
def get_dimensions_from_file_path(cls, file_path): | |
if '.jpg' in file_path: | |
with Image.open(file_path) as im: | |
return im.size | |
return (None, None) | |
@classmethod | |
def from_file_path(cls, file_path): | |
folder_path, filename = os.path.split(file_path) | |
width, height = cls.get_dimensions_from_file_path(file_path) | |
return cls( | |
capture_time=cls.get_capture_time(file_path), | |
filename=filename, | |
folder_path=folder_path, | |
width=width, | |
height=height, | |
) | |
class LightroomImage(BaseImage): | |
@classmethod | |
def from_row(cls, row): | |
instance = cls( | |
capture_time=arrow.get(row['captureTime']), | |
filename=row['idx_filename'], | |
folder_path=row['pathFromRoot'], | |
width=row['fileWidth'], | |
height=row['fileHeight'], | |
) | |
return instance | |
def get_destination_path(self): | |
return os.path.join(DESTINATION_PATH, folder_path, filename) | |
def get_all_lr_images(lr_catalog_path): | |
db = records.Database('sqlite:///{}'.format(lr_catalog_path)) | |
rows = db.query(''' | |
SELECT * FROM Adobe_images image | |
LEFT JOIN agLibraryFile file ON image.rootFile = file.id_local | |
LEFT JOIN agLibraryFolder folder ON file.folder = folder.id_local | |
''') | |
return ImageSet(LightroomImage.from_row(row) for row in rows) | |
def get_restore_image(file_path): | |
return RestoreImage.from_file_path(file_path) | |
def get_all_restore_images(restore_images_folder_path): | |
file_paths = glob.glob('{}/**/*'.format(restore_images_folder_path)) | |
pool = Pool(multiprocessing.cpu_count()) | |
return ImageSet(pool.map(get_restore_image, file_paths)) | |
# ... | |
restore_images = get_all_restore_images(RESTORE_IMAGES_FOLDER_PATH) | |
lr_images = get_all_lr_images(LR_CATALOG_PATH) | |
# match on unique capture time | |
lr_image_r_image_map = {} | |
lr_images_with_unique_capture_time = lr_images.with_unique_capture_time() | |
for r_image in restore_images.with_unique_capture_time(): | |
for lr_image in lr_images_with_unique_capture_time: | |
if r_image.capture_time == lr_image.capture_time: | |
print "{} ---> {}".format(r_image.filename, lr_image.filename) | |
lr_image_r_image_map[lr_image] = r_image | |
break | |
# print len(lr_image_r_image_map) # 725 | |
# # NOT WORKING CORRECTLY | |
# # match on unique dimensions | |
# # lr_image_r_image_map = {} | |
# lr_images_with_unique_dimensions = lr_images.with_unique_dimensions() | |
# for r_image in restore_images.with_unique_dimensions(): | |
# for lr_image in lr_images_with_unique_dimensions: | |
# if r_image.get_dimensions() == lr_image.get_dimensions(): | |
# print "{} ---> {}".format(r_image.filename, lr_image.filename) | |
# lr_image_r_image_map[lr_image] = r_image | |
# break | |
# # print len(lr_image_r_image_map) # 672 | |
for lr_image, r_image in lr_image_r_image_map.items(): | |
folder_path = os.path.join(DESTINATION_PATH, lr_image.folder_path) | |
if not os.path.exists(folder_path): | |
os.makedirs(folder_path) | |
shutil.copy2( | |
os.path.join(r_image.folder_path, r_image.filename), | |
os.path.join(DESTINATION_PATH, lr_image.folder_path, lr_image.filename) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment