Last active
March 7, 2025 09:09
-
-
Save wintermonster/33111751659ad8f4da38d329fd160d49 to your computer and use it in GitHub Desktop.
AWS Multipart Upload with etag checks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
from pathlib import Path | |
from typing import Optional | |
import boto3 | |
import math | |
# Etag Calculation funcs take from https://stackoverflow.com/a/77362525 | |
def get_local_etag(path: Path, chunk_size_bytes: Optional[int] = None) -> str: | |
"""Calculates an expected AWS s3 upload etag for a local on-disk file. | |
Takes into account multipart uploads, but does NOT account for additional encryption | |
(like KMS keys) | |
""" | |
if chunk_size_bytes is None: | |
# This is used by `aws s3 cp` function | |
file_size = path.stat().st_size | |
chunk_size_bytes = 8 * 1024 * 1024 # 8 MB | |
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html | |
while math.ceil(file_size / chunk_size_bytes) > 10000: # | |
chunk_size_bytes *= 2 | |
md5s = [] | |
with open(path, "rb") as fp: | |
while True: | |
data = fp.read(chunk_size_bytes) | |
if not data: | |
break | |
md5s.append(hashlib.md5(data)) | |
if len(md5s) > 1: # We are dealing with a multipart upload | |
digests = b"".join(m.digest() for m in md5s) | |
multipart_md5 = hashlib.md5(digests) | |
expected_etag = f'"{multipart_md5.hexdigest()}-{len(md5s)}"' | |
elif len(md5s) == 1: # File smaller than chunk size | |
expected_etag = f'"{md5s[0].hexdigest()}"' | |
else: # Empty file | |
expected_etag = f'"{hashlib.md5().hexdigest()}"' | |
return expected_etag | |
def determine_multipart_chunk_size(bucket: str, key: str) -> Optional[int]: | |
"""Determines the multipart chunk size for a given S3 path, if it is a multipart upload""" | |
s3_client = boto3.client('s3') | |
head_object = s3_client.head_object(Bucket=bucket, Key=key, PartNumber=1) | |
if head_object.get("PartsCount", 1) > 1 and "ContentLength" in head_object: | |
return head_object["ContentLength"] | |
return None | |
def check_object_integrity(bucket: str, key: str, local_path: Path) -> bool: | |
"""Checks whether transfer from source to destination was properly done""" | |
s3 = boto3.client('s3') | |
s3 = boto3.resource("s3") | |
local_path = local_path | |
s3_object = s3.Object(bucket, key) | |
local_stats = local_path.stat() | |
s3_size_bytes = s3_object.content_length | |
local_size_bytes = local_stats.st_size | |
if s3_size_bytes != local_size_bytes: | |
return False | |
s3_etag = s3_object.e_tag | |
local_hash = get_local_etag(local_path, determine_multipart_chunk_size(bucket, key)) | |
if s3_etag != local_hash: | |
return False | |
return True | |
def upload_file_to_s3(bucket_name, file_name, config): | |
s3 = boto3.client('s3') | |
try: | |
s3.upload_file(file_name, bucket_name, file_name.name, Config=config) | |
return "Success" | |
except Exception as e: | |
return e |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from botocore.exceptions import ClientError | |
from boto3.s3.transfer import TransferConfig | |
from pathlib import Path | |
from aws_helper_funcs import check_object_integrity, upload_file_to_s3 | |
from tqdm.notebook import tqdm | |
# Set the desired multipart threshold value | |
GB = 1024 ** 3 | |
config = TransferConfig(multipart_threshold=1*GB) | |
bucket_name = 'near-data-raw-downloads' | |
filepath = Path(r"") # Enter Directory Path | |
filenames = list(filepath.glob('*.gz')) # chage re to filter files | |
for filename in tqdm(filenames): | |
s3 = boto3.client('s3') | |
try: | |
s3.head_object(Bucket=bucket_name, Key=filename.name) | |
if check_object_integrity(bucket= bucket_name, key=filename.name, local_path=filename): | |
print(f'{filename.name} already exists in S3 and is valid, skipping...') | |
else: | |
print(f'{filename.name} already exists in S3 but is invalid, will attempt to re-upload...') | |
print(upload_file_to_s3(bucket_name, filename, config)) | |
except ClientError as e: | |
if e.response['Error']['Code'] == '404': | |
print(f'{filename.name} does not exist, will attempt to upload...') | |
print(upload_file_to_s3(bucket_name, filename, config)) | |
else: | |
print(f'Error: {e}') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment