Last active
June 3, 2024 16:30
-
-
Save skonik/b20a21fc39f97e16c979c49267d90e05 to your computer and use it in GitHub Desktop.
Asyncio S3 Multipart Upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Details https://skonik.me/uploading-large-file-to-s3-using-aiobotocore/ | |
import asyncio | |
import math | |
import os | |
import aiobotocore | |
import aiofiles | |
AWS_S3_HOST = 'http://localhost:9000' | |
AWS_SECRET_ACCESS_KEY = 'SECRET_KEY' | |
AWS_ACCESS_KEY_ID = 'ACCESS_KEY' | |
AWS_MULTIPART_BYTES_PER_CHUNK = 10000000 # ~ 6mb | |
AWS_S3_BUCKET_NAME = 'test' | |
# We have to keep info about uploaded parts. | |
# https://github.com/boto/boto3/issues/50#issuecomment-72079954 | |
part_info = { | |
'Parts': [] | |
} | |
# File object is distributed across coroutines | |
# and the async file library is using threads under the hood. | |
# This might create data races with unpredictable issues | |
file_shared_lock = asyncio.Lock() | |
async def upload_chunk(client, file, | |
upload_id, chunk_number, | |
bytes_per_chunk, source_size, key): | |
offset = chunk_number * bytes_per_chunk | |
remaining_bytes = source_size - offset | |
bytes_to_read = min([bytes_per_chunk, remaining_bytes]) | |
part_number = chunk_number + 1 | |
async with file_shared_lock: | |
await file.seek(offset) | |
chunk = await file.read(bytes_to_read) | |
resp = await client.upload_part( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Body=chunk, | |
UploadId=upload_id, | |
PartNumber=part_number, | |
Key=key | |
) | |
global part_info | |
part_info['Parts'].append( | |
{ | |
'PartNumber': part_number, | |
'ETag': resp['ETag'] | |
} | |
) | |
async def begin_multipart_upload(from_local_path, to_s3_folder_path, | |
host=AWS_S3_HOST, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
bytes_per_chunk=AWS_MULTIPART_BYTES_PER_CHUNK): | |
filename = os.path.basename(from_local_path) | |
key = '{}/{}'.format(to_s3_folder_path, filename) | |
session = aiobotocore.get_session() | |
async with session.create_client( | |
's3', endpoint_url=host, | |
aws_secret_access_key=aws_secret_access_key, | |
aws_access_key_id=aws_access_key_id | |
) as client: | |
source_size = os.stat(from_local_path).st_size | |
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk))) | |
print('chunks_count: ', chunks_count) | |
create_multipart_upload_resp = await client.create_multipart_upload( | |
ACL='bucket-owner-full-control', | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
) | |
upload_id = create_multipart_upload_resp['UploadId'] | |
tasks = [] | |
async with aiofiles.open(from_local_path, mode='rb') as file: | |
for chunk_number in range(chunks_count): | |
tasks.append( | |
upload_chunk( | |
client=client, | |
file=file, | |
chunk_number=chunk_number, | |
bytes_per_chunk=bytes_per_chunk, | |
key=key, upload_id=upload_id, | |
source_size=source_size | |
) | |
) | |
await asyncio.gather(*tasks) | |
list_parts_resp = await client.list_parts( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
UploadId=upload_id | |
) | |
# You have to sort parts in ascending order. Otherwise api will reject request | |
part_list = sorted(part_info['Parts'], key=lambda k: k['PartNumber']) | |
part_info['Parts'] = part_list | |
print(part_info['Parts'][0]) | |
print('COMPLETED ', len(part_info['Parts'])) | |
if len(list_parts_resp['Parts']) == chunks_count: | |
print('Done uploading file') | |
await client.complete_multipart_upload( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
UploadId=upload_id, | |
MultipartUpload=part_info | |
) | |
return True | |
else: | |
print('Aborted uploading file.') | |
await client.abort_multipart_upload( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
UploadId=upload_id | |
) | |
return False | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(begin_multipart_upload('./large.txt', to_s3_folder_path='large')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@bennyelga
Thank you for pointing this out.
Answering your question.
Here's an example why I consider using lock in this case.
Let's say there are 3 running coroutines:
Coroutine 1: executing
await file.seek(offset)
then coro is switched. Offset here is 1Coroutine 2: executing
await file.seek(offset)
then coro is switched. Offset here is 2Coroutine 3: executing
await file.seek(offset)
then coro is switched. Offset here is 3Under the hood file's pointer offset is switched to one of
[1, 2, 3]
. We can't say exactly. At least I don't know how internally it would work in asyncio.Let's say the offset switched to 2 finally. Scheduler then wakes up
Coroutine 1
instead ofCoroutine 2
supposed to read from offset 1. But now it's switched to offset 2(for the first coroutine offset should be equal 1) and it starts to executechunk = await file.read(bytes_to_read)
which is completely another chunk of data.So, file is a shared resource and this is why I consider the part below to be not really safe. Inconsistent reading may appear from time to time, I guess.
I might be wrong about something. Please share your thoughts if you've noticed something weird. I would appreciate this.