Skip to content

Instantly share code, notes, and snippets.

@daaniam
Created October 21, 2024 00:08
Show Gist options
  • Save daaniam/60720f8696bcf7c77c81e940b01a8163 to your computer and use it in GitHub Desktop.
Save daaniam/60720f8696bcf7c77c81e940b01a8163 to your computer and use it in GitHub Desktop.
Python boto3 AWS S3 chunked async upload

Python boto3 AWS S3 chunked async upload

This is a working example of how to asynchronously upload chunks to an AWS S3 bucket using Python. We should modify or optimize the code to suit our needs. For example, we can use a generator to yield chunks of the file instead of loading the entire file into memory.

.env

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_REGION_NAME=eu-central-1
AWS_S3_SIGNATURE_VERSION=v4

Example

import asyncio
from asyncio import TaskGroup
from pathlib import Path

import boto3
import httpx
from botocore.config import Config
from dotenv import load_dotenv
from httpx import Response

load_dotenv()


BUCKET_NAME = "MY-BUCKET"
LOCAL_FILE_PATH = "./assets/video_large.mp4"
S3_OBJECT_KEY = "video_large.mp4"  # Name of the file in s3
DEFAULT_CHUNK_SIZE = 1024 * 1024 * 10  # 10MB

# Init s3 client
s3_client = boto3.client(
    "s3", config=Config(signature_version="s3v4"), region_name="eu-central-1"
)


def get_file_path() -> Path:
    return Path(LOCAL_FILE_PATH)


def get_file_size(file_path: Path) -> int:
    return file_path.stat().st_size


def chunkify(file_path: Path, chunk_size=DEFAULT_CHUNK_SIZE) -> list[bytes]:
    chunks = []
    with open(file_path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            chunks.append(chunk)
    return chunks


def get_presigned_url(s3_upload_id: str, part_number: int) -> str:
    return s3_client.generate_presigned_url(
        ClientMethod="upload_part",
        Params={
            "Bucket": BUCKET_NAME,
            "Key": S3_OBJECT_KEY,
            "UploadId": s3_upload_id,
            "PartNumber": part_number,
        },
    )


async def upload_chunk(presigned_url: str, chunk: bytes, part_number: int) -> Response:
    print(f"Part number {part_number} - uploading")

    async with httpx.AsyncClient() as client:
        response = await client.put(url=presigned_url, content=chunk, timeout=None)

    print(f"Part number {part_number} - done")
    return response


async def main():
    file_path = get_file_path()
    print("File path:", file_path)

    file_size = get_file_size(file_path)
    print("File size:", file_size, "bytes")

    # Prepare multipart upload to obtain UploadId from s3
    response = s3_client.create_multipart_upload(Bucket=BUCKET_NAME, Key=S3_OBJECT_KEY)
    print("Prepare multipart upload response:", response)

    # Get uploadId
    upload_id = response["UploadId"]
    print("S3 UploadId:", upload_id)

    # Create chunks
    chunks: list[bytes] = chunkify(file_path, chunk_size=DEFAULT_CHUNK_SIZE)
    print("Chunks total:", len(chunks))

    # Asynchronously upload chunks using presigned URLs
    async with TaskGroup() as tg:
        tasks = []
        for index, chunk in enumerate(chunks, start=1):
            # Generate s3 presigned URL for every chunk
            presigned_url = get_presigned_url(upload_id, part_number=index)
            print(f"Chunk {index} presigned URL generated:", presigned_url)

            # Prepare tasks
            task = tg.create_task(
                upload_chunk(
                    presigned_url=presigned_url, chunk=chunk, part_number=index
                )
            )
            tasks.append(task)

        # Execute tasks — if an exception occurs, no further tasks will be executed (TaskGroup)
        upload_results: list[Response] = [await task for task in tasks]

    # Gather ETags for every uploaded chunk from httpx.Result
    etags = []
    for index, httpx_upload_response in enumerate(upload_results, start=1):
        tag_from_header = httpx_upload_response.headers.get("ETag")
        etags.append({"ETag": tag_from_header, "PartNumber": index})
        print(f"Chunk {index} ETag: {tag_from_header}")

    # Let S3 know that we have uploaded all chunks and send ETags (hash for every chunk)
    response = s3_client.complete_multipart_upload(
        Bucket=BUCKET_NAME,
        Key=S3_OBJECT_KEY,
        MultipartUpload={"Parts": etags},
        UploadId=upload_id,
    )

    print("Complete multipart upload response:", response)


if __name__ == "__main__":
    asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment