-
-
Save skonik/b20a21fc39f97e16c979c49267d90e05 to your computer and use it in GitHub Desktop.
| # Details https://skonik.me/uploading-large-file-to-s3-using-aiobotocore/ | |
| import asyncio | |
| import math | |
| import os | |
| import aiobotocore | |
| import aiofiles | |
| AWS_S3_HOST = 'http://localhost:9000' | |
| AWS_SECRET_ACCESS_KEY = 'SECRET_KEY' | |
| AWS_ACCESS_KEY_ID = 'ACCESS_KEY' | |
| AWS_MULTIPART_BYTES_PER_CHUNK = 10000000 # ~ 6mb | |
| AWS_S3_BUCKET_NAME = 'test' | |
| # We have to keep info about uploaded parts. | |
| # https://github.com/boto/boto3/issues/50#issuecomment-72079954 | |
| part_info = { | |
| 'Parts': [] | |
| } | |
| # File object is distributed across coroutines | |
| # and the async file library is using threads under the hood. | |
| # This might create data races with unpredictable issues | |
| file_shared_lock = asyncio.Lock() | |
| async def upload_chunk(client, file, | |
| upload_id, chunk_number, | |
| bytes_per_chunk, source_size, key): | |
| offset = chunk_number * bytes_per_chunk | |
| remaining_bytes = source_size - offset | |
| bytes_to_read = min([bytes_per_chunk, remaining_bytes]) | |
| part_number = chunk_number + 1 | |
| async with file_shared_lock: | |
| await file.seek(offset) | |
| chunk = await file.read(bytes_to_read) | |
| resp = await client.upload_part( | |
| Bucket=AWS_S3_BUCKET_NAME, | |
| Body=chunk, | |
| UploadId=upload_id, | |
| PartNumber=part_number, | |
| Key=key | |
| ) | |
| global part_info | |
| part_info['Parts'].append( | |
| { | |
| 'PartNumber': part_number, | |
| 'ETag': resp['ETag'] | |
| } | |
| ) | |
| async def begin_multipart_upload(from_local_path, to_s3_folder_path, | |
| host=AWS_S3_HOST, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| bytes_per_chunk=AWS_MULTIPART_BYTES_PER_CHUNK): | |
| filename = os.path.basename(from_local_path) | |
| key = '{}/{}'.format(to_s3_folder_path, filename) | |
| session = aiobotocore.get_session() | |
| async with session.create_client( | |
| 's3', endpoint_url=host, | |
| aws_secret_access_key=aws_secret_access_key, | |
| aws_access_key_id=aws_access_key_id | |
| ) as client: | |
| source_size = os.stat(from_local_path).st_size | |
| chunks_count = int(math.ceil(source_size / float(bytes_per_chunk))) | |
| print('chunks_count: ', chunks_count) | |
| create_multipart_upload_resp = await client.create_multipart_upload( | |
| ACL='bucket-owner-full-control', | |
| Bucket=AWS_S3_BUCKET_NAME, | |
| Key=key, | |
| ) | |
| upload_id = create_multipart_upload_resp['UploadId'] | |
| tasks = [] | |
| async with aiofiles.open(from_local_path, mode='rb') as file: | |
| for chunk_number in range(chunks_count): | |
| tasks.append( | |
| upload_chunk( | |
| client=client, | |
| file=file, | |
| chunk_number=chunk_number, | |
| bytes_per_chunk=bytes_per_chunk, | |
| key=key, upload_id=upload_id, | |
| source_size=source_size | |
| ) | |
| ) | |
| await asyncio.gather(*tasks) | |
| list_parts_resp = await client.list_parts( | |
| Bucket=AWS_S3_BUCKET_NAME, | |
| Key=key, | |
| UploadId=upload_id | |
| ) | |
| # You have to sort parts in ascending order. Otherwise api will reject request | |
| part_list = sorted(part_info['Parts'], key=lambda k: k['PartNumber']) | |
| part_info['Parts'] = part_list | |
| print(part_info['Parts'][0]) | |
| print('COMPLETED ', len(part_info['Parts'])) | |
| if len(list_parts_resp['Parts']) == chunks_count: | |
| print('Done uploading file') | |
| await client.complete_multipart_upload( | |
| Bucket=AWS_S3_BUCKET_NAME, | |
| Key=key, | |
| UploadId=upload_id, | |
| MultipartUpload=part_info | |
| ) | |
| return True | |
| else: | |
| print('Aborted uploading file.') | |
| await client.abort_multipart_upload( | |
| Bucket=AWS_S3_BUCKET_NAME, | |
| Key=key, | |
| UploadId=upload_id | |
| ) | |
| return False | |
| if __name__ == '__main__': | |
| loop = asyncio.get_event_loop() | |
| loop.run_until_complete(begin_multipart_upload('./large.txt', to_s3_folder_path='large')) |
@bennyelga
Thank you for pointing this out.
Answering your question.
Here's an example why I consider using lock in this case.
Let's say there are 3 running coroutines:
Coroutine 1: executing await file.seek(offset) then coro is switched. Offset here is 1
Coroutine 2: executing await file.seek(offset) then coro is switched. Offset here is 2
Coroutine 3: executing await file.seek(offset) then coro is switched. Offset here is 3
Under the hood file's pointer offset is switched to one of [1, 2, 3]. We can't say exactly. At least I don't know how internally it would work in asyncio.
Let's say the offset switched to 2 finally. Scheduler then wakes up Coroutine 1 instead of Coroutine 2 supposed to read from offset 1. But now it's switched to offset 2(for the first coroutine offset should be equal 1) and it starts to execute chunk = await file.read(bytes_to_read) which is completely another chunk of data.
So, file is a shared resource and this is why I consider the part below to be not really safe. Inconsistent reading may appear from time to time, I guess.
await file.seek(offset)
chunk = await file.read(bytes_to_read)I might be wrong about something. Please share your thoughts if you've noticed something weird. I would appreciate this.
Im not sure why you need this:
file_shared_lock = asyncio.Lock()This due to the reason asyncio is still single threaded.