-
-
Save skonik/b20a21fc39f97e16c979c49267d90e05 to your computer and use it in GitHub Desktop.
# Details https://skonik.me/uploading-large-file-to-s3-using-aiobotocore/ | |
import asyncio | |
import math | |
import os | |
import aiobotocore | |
import aiofiles | |
AWS_S3_HOST = 'http://localhost:9000' | |
AWS_SECRET_ACCESS_KEY = 'SECRET_KEY' | |
AWS_ACCESS_KEY_ID = 'ACCESS_KEY' | |
AWS_MULTIPART_BYTES_PER_CHUNK = 10000000 # ~ 6mb | |
AWS_S3_BUCKET_NAME = 'test' | |
# We have to keep info about uploaded parts. | |
# https://github.com/boto/boto3/issues/50#issuecomment-72079954 | |
part_info = { | |
'Parts': [] | |
} | |
# File object is distributed across coroutines | |
# and the async file library is using threads under the hood. | |
# This might create data races with unpredictable issues | |
file_shared_lock = asyncio.Lock() | |
async def upload_chunk(client, file, | |
upload_id, chunk_number, | |
bytes_per_chunk, source_size, key): | |
offset = chunk_number * bytes_per_chunk | |
remaining_bytes = source_size - offset | |
bytes_to_read = min([bytes_per_chunk, remaining_bytes]) | |
part_number = chunk_number + 1 | |
async with file_shared_lock: | |
await file.seek(offset) | |
chunk = await file.read(bytes_to_read) | |
resp = await client.upload_part( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Body=chunk, | |
UploadId=upload_id, | |
PartNumber=part_number, | |
Key=key | |
) | |
global part_info | |
part_info['Parts'].append( | |
{ | |
'PartNumber': part_number, | |
'ETag': resp['ETag'] | |
} | |
) | |
async def begin_multipart_upload(from_local_path, to_s3_folder_path, | |
host=AWS_S3_HOST, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
bytes_per_chunk=AWS_MULTIPART_BYTES_PER_CHUNK): | |
filename = os.path.basename(from_local_path) | |
key = '{}/{}'.format(to_s3_folder_path, filename) | |
session = aiobotocore.get_session() | |
async with session.create_client( | |
's3', endpoint_url=host, | |
aws_secret_access_key=aws_secret_access_key, | |
aws_access_key_id=aws_access_key_id | |
) as client: | |
source_size = os.stat(from_local_path).st_size | |
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk))) | |
print('chunks_count: ', chunks_count) | |
create_multipart_upload_resp = await client.create_multipart_upload( | |
ACL='bucket-owner-full-control', | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
) | |
upload_id = create_multipart_upload_resp['UploadId'] | |
tasks = [] | |
async with aiofiles.open(from_local_path, mode='rb') as file: | |
for chunk_number in range(chunks_count): | |
tasks.append( | |
upload_chunk( | |
client=client, | |
file=file, | |
chunk_number=chunk_number, | |
bytes_per_chunk=bytes_per_chunk, | |
key=key, upload_id=upload_id, | |
source_size=source_size | |
) | |
) | |
await asyncio.gather(*tasks) | |
list_parts_resp = await client.list_parts( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
UploadId=upload_id | |
) | |
# You have to sort parts in ascending order. Otherwise api will reject request | |
part_list = sorted(part_info['Parts'], key=lambda k: k['PartNumber']) | |
part_info['Parts'] = part_list | |
print(part_info['Parts'][0]) | |
print('COMPLETED ', len(part_info['Parts'])) | |
if len(list_parts_resp['Parts']) == chunks_count: | |
print('Done uploading file') | |
await client.complete_multipart_upload( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
UploadId=upload_id, | |
MultipartUpload=part_info | |
) | |
return True | |
else: | |
print('Aborted uploading file.') | |
await client.abort_multipart_upload( | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
UploadId=upload_id | |
) | |
return False | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(begin_multipart_upload('./large.txt', to_s3_folder_path='large')) |
@bennyelga
Thank you for pointing this out.
Answering your question.
Here's an example why I consider using lock in this case.
Let's say there are 3 running coroutines:
Coroutine 1: executing await file.seek(offset)
then coro is switched. Offset here is 1
Coroutine 2: executing await file.seek(offset)
then coro is switched. Offset here is 2
Coroutine 3: executing await file.seek(offset)
then coro is switched. Offset here is 3
Under the hood file's pointer offset is switched to one of [1, 2, 3]
. We can't say exactly. At least I don't know how internally it would work in asyncio.
Let's say the offset switched to 2 finally. Scheduler then wakes up Coroutine 1
instead of Coroutine 2
supposed to read from offset 1. But now it's switched to offset 2(for the first coroutine offset should be equal 1) and it starts to execute chunk = await file.read(bytes_to_read)
which is completely another chunk of data.
So, file is a shared resource and this is why I consider the part below to be not really safe. Inconsistent reading may appear from time to time, I guess.
await file.seek(offset)
chunk = await file.read(bytes_to_read)
I might be wrong about something. Please share your thoughts if you've noticed something weird. I would appreciate this.
Im not sure why you need this:
file_shared_lock = asyncio.Lock()
This due to the reason asyncio is still single threaded.