Created
December 23, 2019 18:02
-
-
Save cnk/65d4110b9b71839a000891ca3d241301 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import traceback | |
from io import BytesIO | |
from collections import OrderedDict | |
from django.core.files.images import ImageFile | |
from wagtail.images import get_image_model | |
from djunk.utils import get_or_generate | |
from core.logging import logger | |
from .utils import convert_epsacrops_to_focal_point | |
class ImageMigrator(object): | |
def __get_title(self, yml): | |
if yml.get('title_text'): | |
return yml['title_text'] | |
if yml.get('file', None): | |
# This import is dealing with files in the local file system | |
return yml.get('file') | |
elif yml.get('url', None): | |
# This import is getting urls from which we can upload the images, retrieve the actual image content | |
return yml['url'].split('/')[-1] | |
def __get_image_data(self, yml): | |
if yml.get('file', None): | |
# This import is dealing with files in the local file system | |
return (open(os.path.join(os.getcwd(), 'images', yml['file']), 'rb'), yml['file']) | |
elif yml.get('url', None): | |
# This import is getting urls from which we can upload the images, retrieve the actual image content | |
response = requests.get(yml['url']) | |
return (BytesIO(response.content), yml['url'].split('/')[-1]) | |
else: | |
raise RuntimeError('We need either a local file or a url from which we can retrieve the file.') | |
def __file_needs_update(self, image, yml): | |
# figure out if we need to create new file object or not | |
if yml.get('file', None): | |
# This import is dealing with files in the local file system | |
new_file_size = os.path.getsize(os.path.join(os.getcwd(), 'images', yml['file'])) | |
return bool(image.file) \ | |
and (yml['file'] != image.file.name.split('/')[-1] or new_file_size != image.file.size) | |
elif yml.get('url', None): | |
# This import is getting urls from which we can upload the images | |
new_filename = yml['url'].split('/')[-1] | |
return bool(image.file) and \ | |
(new_filename != image.file.name.split('/')[-1] or image.file_size != int(yml['filesize'])) | |
else: | |
raise RuntimeError('We need either a local file or a url from which we can retrieve the file.') | |
def create(self, site_helper, yml, dry_run=False): | |
""" | |
images: | |
- id: 1 | |
title_text: foobar | |
photo_credit: credit | |
caption: caption | |
alt_text: alt | |
file: foobar.jpg | |
tags: | |
- tag1 | |
- tag2 | |
""" | |
image = None | |
if not dry_run: | |
image, created = get_or_generate(get_image_model(), import_id=site_helper.import_id(yml['id'])) | |
image.title = self.__get_title(yml).replace('%20', ' ') | |
image.photo_credit = yml.get('photo_credit', '') | |
image.caption = yml.get('caption', '') | |
image.alt = yml.get('alt_text', yml.get('title_text', image.title)) | |
image.collection = site_helper.collection | |
if yml.get('focal_point_x'): | |
# This image data has Wagtail focal point info, which we can just import directly. | |
image.focal_point_x = yml.get('focal_point_x') | |
image.focal_point_y = yml.get('focal_point_y') | |
image.focal_point_width = yml.get('focal_point_width') | |
image.focal_point_height = yml.get('focal_point_height') | |
elif yml.get('epsacrops'): | |
# This image data only has epsacrop info from Drupal, so we need to interpret it into focal point data. | |
convert_epsacrops_to_focal_point(image, yml['epsacrops']) | |
if created or self.__file_needs_update(image, yml): | |
image_data, filename = self.__get_image_data(yml) | |
image.file = ImageFile(image_data, name=filename.replace('%20', ' ')) | |
image.file_size = yml.get('filesize') or len(image.file) | |
try: | |
image.save() | |
logger.info( | |
'importer.image.{}'.format('create' if created else 'update'), | |
file=yml.get('file', yml.get('url', 'NO FILE?!')) | |
) | |
except Exception as err: | |
# If the image file isn't valid requests still gets a 200 status code so I can't test before saving | |
logger.warning( | |
'importer.image.invalid_image', | |
file=yml.get('file', yml.get('url', 'NO FILE?!')), | |
error=traceback.print_tb(err.__traceback__), | |
) | |
if yml.get('tags', None): | |
image.tags.set(*yml['tags']) | |
else: | |
logger.info( | |
'importer.image.create.dry-run', | |
file=yml.get('file', yml.get('url', 'NO FILE?!')) | |
) | |
return image | |
def export(self, img): | |
img_data = OrderedDict([('id', img.id), | |
('file', img.filename), | |
('title_text', img.title), | |
]) | |
if getattr(img, 'alt', None) and not img.alt == 'Replace this default': | |
img_data['alt_text'] = img.alt | |
for field in ['caption', 'photo_credit', 'focal_point_height', 'focal_point_width', | |
'focal_point_x', 'focal_point_y', 'height', 'width', 'filesize']: | |
if getattr(img, field, None): | |
img_data[field] = getattr(img, field) | |
if img.tags.exists(): | |
img_data['tags'] = [tag.name for tag in img.tags.all()] | |
return img_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment