This is a working example of how to asynchronously upload chunks to an AWS S3 bucket using Python. We should modify or optimize the code to suit our needs. For example, we can use a generator to yield chunks of the file instead of loading the entire file into memory.
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_REGION_NAME=eu-central-1
AWS_S3_SIGNATURE_VERSION=v4
import asyncio
from asyncio import TaskGroup
from pathlib import Path
import boto3
import httpx
from botocore.config import Config
from dotenv import load_dotenv
from httpx import Response
load_dotenv()
BUCKET_NAME = "MY-BUCKET"
LOCAL_FILE_PATH = "./assets/video_large.mp4"
S3_OBJECT_KEY = "video_large.mp4" # Name of the file in s3
DEFAULT_CHUNK_SIZE = 1024 * 1024 * 10 # 10MB
# Init s3 client
s3_client = boto3.client(
"s3", config=Config(signature_version="s3v4"), region_name="eu-central-1"
)
def get_file_path() -> Path:
return Path(LOCAL_FILE_PATH)
def get_file_size(file_path: Path) -> int:
return file_path.stat().st_size
def chunkify(file_path: Path, chunk_size=DEFAULT_CHUNK_SIZE) -> list[bytes]:
chunks = []
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
return chunks
def get_presigned_url(s3_upload_id: str, part_number: int) -> str:
return s3_client.generate_presigned_url(
ClientMethod="upload_part",
Params={
"Bucket": BUCKET_NAME,
"Key": S3_OBJECT_KEY,
"UploadId": s3_upload_id,
"PartNumber": part_number,
},
)
async def upload_chunk(presigned_url: str, chunk: bytes, part_number: int) -> Response:
print(f"Part number {part_number} - uploading")
async with httpx.AsyncClient() as client:
response = await client.put(url=presigned_url, content=chunk, timeout=None)
print(f"Part number {part_number} - done")
return response
async def main():
file_path = get_file_path()
print("File path:", file_path)
file_size = get_file_size(file_path)
print("File size:", file_size, "bytes")
# Prepare multipart upload to obtain UploadId from s3
response = s3_client.create_multipart_upload(Bucket=BUCKET_NAME, Key=S3_OBJECT_KEY)
print("Prepare multipart upload response:", response)
# Get uploadId
upload_id = response["UploadId"]
print("S3 UploadId:", upload_id)
# Create chunks
chunks: list[bytes] = chunkify(file_path, chunk_size=DEFAULT_CHUNK_SIZE)
print("Chunks total:", len(chunks))
# Asynchronously upload chunks using presigned URLs
async with TaskGroup() as tg:
tasks = []
for index, chunk in enumerate(chunks, start=1):
# Generate s3 presigned URL for every chunk
presigned_url = get_presigned_url(upload_id, part_number=index)
print(f"Chunk {index} presigned URL generated:", presigned_url)
# Prepare tasks
task = tg.create_task(
upload_chunk(
presigned_url=presigned_url, chunk=chunk, part_number=index
)
)
tasks.append(task)
# Execute tasks — if an exception occurs, no further tasks will be executed (TaskGroup)
upload_results: list[Response] = [await task for task in tasks]
# Gather ETags for every uploaded chunk from httpx.Result
etags = []
for index, httpx_upload_response in enumerate(upload_results, start=1):
tag_from_header = httpx_upload_response.headers.get("ETag")
etags.append({"ETag": tag_from_header, "PartNumber": index})
print(f"Chunk {index} ETag: {tag_from_header}")
# Let S3 know that we have uploaded all chunks and send ETags (hash for every chunk)
response = s3_client.complete_multipart_upload(
Bucket=BUCKET_NAME,
Key=S3_OBJECT_KEY,
MultipartUpload={"Parts": etags},
UploadId=upload_id,
)
print("Complete multipart upload response:", response)
if __name__ == "__main__":
asyncio.run(main())