Skip to content

Instantly share code, notes, and snippets.

@EverWinter23
Last active July 5, 2018 07:49
Show Gist options
  • Save EverWinter23/bd1dfc927646b564175d286ec7d9bbed to your computer and use it in GitHub Desktop.
Save EverWinter23/bd1dfc927646b564175d286ec7d9bbed to your computer and use it in GitHub Desktop.
Use s3 multipart uploads to upload big files. And yes you can resume the upload, should the upload get interrupted --even if your PC crashes. RESUMABLE!!!
'''
20th june 2018 wednesday
NOTE:
Any changes you make should not compromise
with the atomicity of the actions. Review
carefully and multiple times before making
any changes, albeit reordering of the stmts.
'''
import os
import math
import shelve
import logging
import threading
from boto3 import client
from boto3 import resource
from threading import Lock
from threading import Thread
from filechunkio import FileChunkIO
# useful constants
KB = 1024
MB = KB * KB
PART_LIM = 5 * MB
# setup logging
logFormat = '[%(levelname)s %(filename)s %(lineno)d]: %(message)s'
logging.basicConfig(filename='s3multipart.log', level='INFO',
filemode='w', format=logFormat)
# string literals for 'shelve'
XPARTS = 'xparts'
UPLOAD_ID = 'upload_id'
XPARTS_FILE = 'xparts'
PART_LIST = 'parts'
class S3Con:
"""
parameters
bucket_name: str
name of the aws s3 bucket(case sensitive) to
which the files are to be uploaded.
key_id: str
amazon access-key of the dev's IAM account.
secret_key: str
amazon secret-access-key of the dev's IAM
account. Not to be shared.
region: str
default aws region
db_path: str
path to the database where actions are
stored, for storing shelve variables.
_multipart: boolean
if set to True, it will upload the file by
partitioning the file into mulitple parts if
they are above a certain threshold --PART_LIM.
_xthreads: int
number of threads to spawn for uploading the
parts simultaneously
"""
def __init__(self, bucket_name, key_id, secret_key, region,
db_path, _multipart=True, _xthreads=5):
self._client = client('s3', aws_access_key_id=key_id,
aws_secret_access_key=secret_key)
self._bucket_name = bucket_name
self._bucket = resource('s3').Bucket(bucket_name)
self._xparts = shelve.open(os.path.join(db_path, XPARTS_FILE),
flag='c', protocol=None, writeback=True)
self._xthreads = _xthreads
self._multipart = _multipart
def _send(self, src_path, remote_path):
"""
Transfers a resource(file) to the s3-bucket.
parameters
src_path
local path of the resource to be sent
remote_path
mapped path of the src_path on the
remote storage
"""
if os.path.exists(src_path) is False and self._multipart:
logging.info('The file \'{}\' does not exist.'.format(src_path))
logging.info('Removing partitioning info.')
self._abort_parts(src_path)
raise FileNotFoundError
elif os.stat(src_path).st_size > PART_LIM and self._multipart:
self._send_parts(src_path, remote_path)
else:
with open(src_path, 'rb') as data:
self._client.upload_fileobj(Fileobj=data,
Bucket=self._bucket_name,
Key=remote_path)
def _delete(self, remote_path):
"""
Deletes the resource specified by the remote path
in the s3-bucket.
parameters
remote_path
path of the resource to be deleted on
the remote storage
"""
# NOTE: If a folder is specified by the path, we will
# need to delete all other objects that contain that
# folder's prefix --explained in the LOG.md
prefix = remote_path
objects_to_delete = []
for obj in self._bucket.objects.filter(Prefix=prefix):
objects_to_delete.append({'Key': obj.key})
if not objects_to_delete:
return
logging.info("To delete {}".format(objects_to_delete))
self._client.delete_objects(Delete={
'Objects': objects_to_delete})
def _move(self, remote_src_path, remote_dest_path):
"""
Moves a resource in the s3-bucket.
parameters
remote_src_path
remote path of the resource before it was moved
remote_dest_path
remote path of the resource after it was moved
"""
# NOTE: Relatively more complex, have to copy all
# objects to with new key to the bucket and then
# delete all objects with the prefix that were moved.
# Even more complex when folders are involved.
old_key = remote_src_path
new_key = remote_dest_path
logging.info("To move {}".format(old_key))
cpy_src = os.path.join(self._bucket_name, old_key)
logging.info("The source loc for copy is /'{}/'".format(cpy_src))
self._client.copy_object(Bucket=self._bucket_name,
CopySource=cpy_src,
Key=new_key)
# also note that object to move is also the object to be
# deleted, so we'll just delete that object over here
self._client.delete_object(Bucket=self._bucket_name,
Key=old_key)
def _send_parts(self, src_path, key):
"""
Uploads the file by partitioning the file into
mulitple segements.
parameters
src_path: str
local path of the resource to be sent
key: str
the mapped path of the local file on
the remote storage
"""
logging.info('Loading partitioning info.')
_upload_id = self._load_parts(src_path, key)
# upload x parts simultaneously
while self._xparts[XPARTS]:
logging.info('Parts: {}'.format(self._xparts[XPARTS]))
_num_parts_ = len(self._xparts[XPARTS])
_threads_ = []
_thread_count = min(self._xthreads, _num_parts_)
_lock = Lock()
logging.info('Using {} \'threads\''.format(_thread_count))
for x in range(_thread_count):
_threads_.append(Thread(target=self._upload_part,
args=(src_path, key, _upload_id,
self._xparts[XPARTS][x],
_lock)))
# start the threads
for _thread in _threads_:
_thread.start()
# wait for all 'threads' to complete
for _thread in _threads_:
_thread.join()
logging.info(self._xparts[PART_LIST])
# notify s3 to assemble parts
self._client.complete_multipart_upload(
Bucket=self._bucket_name,
Key=key, UploadId=_upload_id,
MultipartUpload={"Parts": self._xparts[PART_LIST]})
# clean up shelve for next upload
self._xparts.clear()
self._xparts.sync()
def _upload_part(self, src_path, key, _upload_id, part_id, _lock):
"""
Uploads a part of the file to the remote
storage.
parameters
src_path: str
local path of the resource to be sent
key: str
the mapped path of the local file on
the remote storage
_upload_id: str
multipart upload id for the upload
_part_id: int
the id of the part being uploaded
_lock: Lock
grants exclusive write access to the
thread for writing to the 'shelve'
"""
size = os.stat(src_path).st_size
offset = (part_id - 1) * PART_LIM
# check for overflow, last part could be less than 5MB
_bytes = min(PART_LIM, size - offset)
part = FileChunkIO(src_path, 'r', offset=offset,
bytes=_bytes).read()
result = self._client.upload_part(
Body=part, Bucket=self._bucket_name, Key=key,
UploadId=_upload_id, PartNumber=part_id)
self._mark_part(part_id, _lock, result)
def _load_parts(self, src_path, key):
"""
Loads the partioning info of the file
from the disk.
parameters
src_path: str
local path of the resource to be sent
key: str
the mapped path of the local file on
the remote storage
"""
if UPLOAD_ID not in self._xparts:
logging.info('No partitioning info found. Partitioning file.')
self._xparts[UPLOAD_ID] = self._client.create_multipart_upload(
Bucket=self._bucket_name,
Key=key)['UploadId']
self._xparts.sync()
if XPARTS not in self._xparts:
size = os.stat(src_path).st_size
self._xparts[XPARTS] = list(range(1, math.ceil(size/PART_LIM) + 1))
self._xparts.sync()
if PART_LIST not in self._xparts:
self._xparts[PART_LIST] = [{} for x in range(len(
self._xparts[XPARTS]))]
self._xparts.sync()
return self._xparts[UPLOAD_ID]
def _mark_part(self, part_id, _lock, result):
"""
Marks the part as uploaded by removing
it from the list of parts to be uploaded.
TODO: Send notification to the DB that a
part has been uploaded. -_-
parameters
_part_id: int
the id of the part being uploaded
_lock: Lock
grants exclusive write access to the thread
for writing to the 'shelve'
result: dict
response of the aws-s3 of the uploaded part
"""
# NOTE: avoids deadlocks
# provides the thread with exclusive write access
with _lock:
# sync only after noth changes have been written
self._xparts[XPARTS].remove(part_id)
self._xparts[PART_LIST][part_id - 1] = {"PartNumber": part_id,
"ETag": result["ETag"]}
self._xparts.sync()
# logging.info('part_id={} uploaded.'.format(part_id))
# TODO schema changes, mark part to DB
# uncomment for listing parts uploaded to the bucket
logging.info(self._xparts[XPARTS])
def _abort_parts(self, key):
"""
Removes the partitioning info of the corresponding
file from the disk, and signals aws-s3 to remove
the partitions of that file from the remote storage.
parameters
key: str
the mapped path of the local file on
the remote storage
"""
_upload_id = self._load_parts()
self._client.abort_multipart_upload(Bucket=self._bucket_name,
Key=key, UploadId=_upload_id)
# clean up shelve for next upload
self._xparts.clear()
self._xparts.sync()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment