Created
November 21, 2018 02:18
-
-
Save kjenney/068531ffe01e14bb7a2351dc55592551 to your computer and use it in GitHub Desktop.
Boto3 Upload Failure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import io | |
import json | |
import boto3 | |
from boto3.exceptions import ResourceNotExistsError | |
from botocore.exceptions import ClientError | |
import pprint | |
import logging | |
import tempfile | |
from wand.image import Image | |
import magic | |
s3 = boto3.resource('s3') | |
client = boto3.client('s3') | |
bucket = "super-secret-bucket-name" | |
my_bucket = s3.Bucket(bucket) | |
logger = logging.getLogger(__name__) | |
ops = { | |
"compress": 90 | |
} | |
LOSSY_IMAGE_FMTS = ('jpg', 'jpeg', 'webp', 'png') | |
def is_lossy(ext, ops): | |
return ('fm' in ops and ops['fm'] in LOSSY_IMAGE_FMTS) or ext in LOSSY_IMAGE_FMTS | |
def is_image(file): | |
basename = os.path.basename(file) | |
name, ext = os.path.splitext(basename) | |
ext = ext.strip('.') | |
#print(ext) | |
if ext in LOSSY_IMAGE_FMTS: | |
#print('Awesome') | |
return True | |
else: | |
#print('Bummer') | |
return False | |
def get_mime_type(file_path): | |
mime = magic.Magic(mime=True) | |
return mime.from_file(file_path) | |
def get_file_size(file_path): | |
return os.path.getsize(file_path) | |
def get_folder(file): | |
if '/' in file: | |
return os.path.dirname(file) | |
else: | |
return '/' | |
def download(bucket, key): | |
""" | |
Download the file from s3 given `bucket` and `key`. | |
Returns | |
------- | |
str | |
the filename of the downloaded file or None if the file was not found | |
""" | |
basename = os.path.basename(key) | |
name, ext = os.path.splitext(basename) | |
print('Getting ready to download ' + basename) | |
logger.info('Downloading file from s3 at {}'.format(basename)) | |
code, tmp_file = tempfile.mkstemp() | |
tmp_file = tmp_file + ext | |
print("This is what we're calling our temp file: " + tmp_file) | |
try: | |
my_bucket.download_file(key, tmp_file) | |
except ClientError: | |
logger.exception('File not found: {}'.format(basename)) | |
#print('File not found: {}'.format(basename)) | |
else: | |
return tmp_file | |
def upload(bucket, folder, filename): | |
print('Getting ready to upload ' + filename) | |
logger.info('Uploading file to s3 at {}'.format(filename)) | |
basename = os.path.basename(filename) | |
mime_type = get_mime_type(filename) | |
print('File to Upload, filename: ' + filename) | |
print("Mime Type: " + mime_type) | |
print('Name in Bucket, basename: ' + basename) | |
print("client.upload_file(filename,my_bucket,basename)") | |
client.upload_file(filename,my_bucket,basename) | |
def image_compress(filename, ops): | |
""" | |
Compress the image specified by `filename` using the transformations specified by `ops` (operations) | |
Returns | |
------- | |
str | |
the filename of the transformed image | |
""" | |
print("Let's compress " + filename) | |
basename = os.path.basename(filename) | |
name, ext = os.path.splitext(basename) | |
ext = ext.strip('.') | |
code, path = tempfile.mkstemp() | |
output = path + '.' + ext | |
with Image(filename=filename) as src: | |
with src.clone() as img: | |
if is_lossy(ext, ops): | |
if 'compress' in ops: | |
q = int(ops['compress']) | |
else: | |
q = DEFAULT_QUALITY_RATE | |
img.compression_quality = q | |
img.save(filename=output) | |
return output | |
def optimize(file, folder): | |
img_filename = download(my_bucket, file) | |
if img_filename: | |
output_img = image_compress(img_filename, ops) | |
print('Compressed ' + img_filename + ' to ' + output_img) | |
upload(my_bucket, folder, output_img) | |
else: | |
return('Error') | |
def process(event, context): | |
for file in my_bucket.objects.all(): | |
if is_image(file.key): | |
folder = get_folder(file.key) | |
optimize(file.key, folder) | |
return('Done') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment