Last active
September 28, 2021 15:01
-
-
Save mingrammer/bf56ed6c23a421ed449e41f1fe4822be to your computer and use it in GitHub Desktop.
Python script for AWS S3 with boto api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import StringIO | |
import math | |
import boto3 | |
# custom hash function that hashes the filename | |
from generator import generate_file_hash | |
class S3Manager: | |
""" | |
@todo : manages and provides the s3 buckets and services | |
""" | |
BUCKET_NAME = 'S3_BUCKET_NAME' | |
def __init__(self): | |
self.client = boto3.client('s3', 'S3_REGION') | |
def upload_file(self, file, dest, headers={}): | |
hashed_filename = generate_file_hash(file.filename) | |
self.client.put_object(Bucket=self.BUCKET_NAME, | |
Key=dest + hashed_filename, | |
Body=file, | |
Metadata=headers) | |
return hashed_filename | |
def upload_large_file(self, file, dest, headers={}): | |
hashed_filename = generate_file_hash(file.filename) | |
mpu = self.client.create_multipart_upload(Bucket=self.BUCKET_NAME, | |
Key=dest + hashed_filename, | |
Metadata=headers) | |
part_list = [] | |
part_list_appender = part_list.append | |
file.read() | |
file_obj = file.stream | |
file_size = file_obj.tell() | |
file_obj.seek(0, 0) | |
# use chunk size of 100MiB | |
chunk_size = 104857600 | |
chunk_count = int(math.ceil(file_size / float(chunk_size))) | |
for i in range(chunk_count): | |
offset = chunk_size * i | |
bytes = min(chunk_size, file_size - offset) | |
str_file_like = StringIO.StringIO() | |
file_obj.seek(0, 1) | |
str_file_like.write(file_obj.read(bytes)) | |
str_file_like.seek(0, 0) | |
part = self.client.upload_part(Bucket=self.BUCKET_NAME, | |
Key=mpu['Key'], | |
UploadId=mpu['UploadId'], | |
PartNumber=i + 1, | |
Body=str_file_like) | |
part_list_appender({ | |
'ETag': part['ETag'], | |
'PartNumber': i + 1 | |
}) | |
self.client.complete_multipart_upload(Bucket=self.BUCKET_NAME, | |
Key=mpu['Key'], | |
UploadId=mpu['UploadId'], | |
MultipartUpload={ | |
'Parts': part_list | |
}) | |
return hashed_filename | |
def delete_file(self, file_key, source): | |
if file_key: | |
self.client.delete_object(Bucket=self.BUCKET_NAME, | |
Key=source + file_key) | |
def delete_directory(self, prefix): | |
object_list = self.client.list_objects(Bucket=self.BUCKET_NAME, | |
Prefix=prefix)['Contents'] | |
for object in object_list: | |
self.client.delete_object(Bucket=self.BUCKET_NAME, | |
Key=object['Key']) | |
def update_file(self, file_key_to_del, file_to_update, update_path, headers=None): | |
self.delete_file(file_key_to_del, update_path) | |
return self.upload_file(file_to_update, update_path, headers=headers) | |
def update_large_file(self, file_key_to_del, file_to_update, update_path, headers=None): | |
self.delete_file(file_key_to_del, update_path) | |
return self.upload_large_file(file_to_update, update_path, headers=headers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment