Skip to content

Instantly share code, notes, and snippets.

@wintermonster
Last active March 7, 2025 09:09
Show Gist options
  • Save wintermonster/33111751659ad8f4da38d329fd160d49 to your computer and use it in GitHub Desktop.
Save wintermonster/33111751659ad8f4da38d329fd160d49 to your computer and use it in GitHub Desktop.
AWS Multipart Upload with etag checks
import hashlib
from pathlib import Path
from typing import Optional
import boto3
import math
# Etag Calculation funcs take from https://stackoverflow.com/a/77362525
def get_local_etag(path: Path, chunk_size_bytes: Optional[int] = None) -> str:
"""Calculates an expected AWS s3 upload etag for a local on-disk file.
Takes into account multipart uploads, but does NOT account for additional encryption
(like KMS keys)
"""
if chunk_size_bytes is None:
# This is used by `aws s3 cp` function
file_size = path.stat().st_size
chunk_size_bytes = 8 * 1024 * 1024 # 8 MB
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
while math.ceil(file_size / chunk_size_bytes) > 10000: #
chunk_size_bytes *= 2
md5s = []
with open(path, "rb") as fp:
while True:
data = fp.read(chunk_size_bytes)
if not data:
break
md5s.append(hashlib.md5(data))
if len(md5s) > 1: # We are dealing with a multipart upload
digests = b"".join(m.digest() for m in md5s)
multipart_md5 = hashlib.md5(digests)
expected_etag = f'"{multipart_md5.hexdigest()}-{len(md5s)}"'
elif len(md5s) == 1: # File smaller than chunk size
expected_etag = f'"{md5s[0].hexdigest()}"'
else: # Empty file
expected_etag = f'"{hashlib.md5().hexdigest()}"'
return expected_etag
def determine_multipart_chunk_size(bucket: str, key: str) -> Optional[int]:
"""Determines the multipart chunk size for a given S3 path, if it is a multipart upload"""
s3_client = boto3.client('s3')
head_object = s3_client.head_object(Bucket=bucket, Key=key, PartNumber=1)
if head_object.get("PartsCount", 1) > 1 and "ContentLength" in head_object:
return head_object["ContentLength"]
return None
def check_object_integrity(bucket: str, key: str, local_path: Path) -> bool:
"""Checks whether transfer from source to destination was properly done"""
s3 = boto3.client('s3')
s3 = boto3.resource("s3")
local_path = local_path
s3_object = s3.Object(bucket, key)
local_stats = local_path.stat()
s3_size_bytes = s3_object.content_length
local_size_bytes = local_stats.st_size
if s3_size_bytes != local_size_bytes:
return False
s3_etag = s3_object.e_tag
local_hash = get_local_etag(local_path, determine_multipart_chunk_size(bucket, key))
if s3_etag != local_hash:
return False
return True
def upload_file_to_s3(bucket_name, file_name, config):
s3 = boto3.client('s3')
try:
s3.upload_file(file_name, bucket_name, file_name.name, Config=config)
return "Success"
except Exception as e:
return e
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
from pathlib import Path
from aws_helper_funcs import check_object_integrity, upload_file_to_s3
from tqdm.notebook import tqdm
# Set the desired multipart threshold value
GB = 1024 ** 3
config = TransferConfig(multipart_threshold=1*GB)
bucket_name = 'near-data-raw-downloads'
filepath = Path(r"") # Enter Directory Path
filenames = list(filepath.glob('*.gz')) # chage re to filter files
for filename in tqdm(filenames):
s3 = boto3.client('s3')
try:
s3.head_object(Bucket=bucket_name, Key=filename.name)
if check_object_integrity(bucket= bucket_name, key=filename.name, local_path=filename):
print(f'{filename.name} already exists in S3 and is valid, skipping...')
else:
print(f'{filename.name} already exists in S3 but is invalid, will attempt to re-upload...')
print(upload_file_to_s3(bucket_name, filename, config))
except ClientError as e:
if e.response['Error']['Code'] == '404':
print(f'{filename.name} does not exist, will attempt to upload...')
print(upload_file_to_s3(bucket_name, filename, config))
else:
print(f'Error: {e}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment