Here is what this code does:
1. Create a list of pre-signed urls - the number depends on the chosen chunk size (e.g. 8MB)
2. Upload a sample file to S3 using pre-signed URLs. In the real-world scenario we would generate the link on the server side, and upload data using these URLs on the client side.
3. Complete the multipart upload. Again this one would be executed on the server as well. Additionally you may request object size on the server side after completion, and return the size to the client,
so the client can copmpare the original file size to the uploaded one in order to ensure data integrity.
4. Just for the sake of test - we download the uploaded file back to ensure the uploading was successfull.
Few problems that I had to solve while crafting this code:
- Ensure that the signed_urls list contains the data chunks coupled with the real part number within the file. So the first part is the begining of the file and the last part is the end of the file.
- When Uploading multiple parts concurrently use f.seek to read exactly the needed part of the file. This is because we access the different parts of the file in unordered manner.
- When completing the multipart, ensure you sort the parts before passing them to complete_multipart method.
The Code:
import boto3
import os
import time
import requests
import math
import threading
import concurrent.futures
BUCKET = "cliptracer-dev"
AWS_ACCESS_KEY = 'myaccesskey'
AWS_SECRET_KEY = 'mysecretkey'
ENDPOINT_URL = 'https://mys3nasserver.com:9000'
# Initialize the S3 client with custom endpoint
s3 = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
endpoint_url=ENDPOINT_URL
)
local_file = r"""C:\\Users\\Pavlo\\Downloads\\GX012980.MP4"""
s3_key = "test/GX012980.MP4"
downloaded_file = r"""C:\\Users\\Pavlo\\Downloads\\GX012980_downloaded.MP4"""
# Check and remove the existing file on local disk
if os.path.exists(downloaded_file):
os.remove(downloaded_file)
print(f"Existing local file '{downloaded_file}' removed.")
print("Checking if the object exists in the S3")
# Check if the object exists in the S3 bucket and delete it if it does
try:
s3.head_object(Bucket=BUCKET, Key=s3_key)
s3.delete_object(Bucket=BUCKET, Key=s3_key)
print(f"Existing object '{s3_key}' in bucket '{BUCKET}' deleted.")
except s3.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print(f"Object '{s3_key}' does not exist in bucket '{BUCKET}', proceeding with upload.")
else:
print(f"Error checking object '{s3_key}': {e}")
raise
# Define part size in bytes
part_size = 8 * 1024 * 1024 # 8 MB
# Get file size
file_size = os.path.getsize(local_file)
# Calculate number of parts
parts_num = math.ceil(file_size / part_size)
# Create a multipart upload
response = s3.create_multipart_upload(Bucket=BUCKET, Key=s3_key)
upload_id = response['UploadId']
print(f"Multipart upload initiated. Upload ID: {upload_id}")
# Step 1: Generate presigned URLs with PartNumbers
signed_urls = []
for part_no in range(parts_num):
signed_url = s3.generate_presigned_url(
ClientMethod='upload_part',
Params={
'Bucket': BUCKET,
'Key': s3_key,
'UploadId': upload_id,
'PartNumber': part_no + 1
}
)
signed_urls.append({'PartNumber': part_no + 1, 'URL': signed_url})
print(f"Generated {len(signed_urls)} presigned URLs.")
# Step 2: Upload parts in reverse order using threads
# Lock for thread-safe appending to `parts` and handling `failed_flag`
lock = threading.Lock()
# Global flag to indicate if any part failed
failed_flag = threading.Event()
# Function to upload a part
def upload_part(item, local_file, part_size, retries=5, interval=2):
global failed_flag
part_no = item['PartNumber']
signed_url = item['URL']
headers = None
# Read the correct part of the file
with open(local_file, 'rb') as f:
f.seek((part_no - 1) * part_size) # Move to the start of the part
part_data = f.read(part_size)
for attempt in range(retries):
if failed_flag.is_set(): # Stop retries if upload is already marked as failed
print(f"Part {part_no}: Canceling retries as upload has failed.")
return
try:
response = requests.put(signed_url, data=part_data)
if response.status_code == 200:
headers = response.headers
break
else:
print(f"Part {part_no}: Failed attempt {attempt + 1}/{retries}: {response.text}")
except requests.exceptions.RequestException as e:
print(f"Part {part_no}: Error during attempt {attempt + 1}/{retries}: {e}")
time.sleep(interval) # Wait before retrying
if headers:
with lock:
parts.append({
'ETag': headers['ETag'],
'PartNumber': part_no
})
print(f"Uploaded part {part_no}")
else:
print(f"Failed to upload part {part_no} after retries.")
failed_flag.set() # Mark the upload as failed
# Main logic
parts = []
failed_flag.clear() # Ensure the flag is clear before starting uploads
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(upload_part, item, local_file, part_size)
for item in signed_urls
]
concurrent.futures.wait(futures)
if failed_flag.is_set():
raise Exception("Upload failed due to part upload failure.")
except Exception as e:
print(f"Aborting multipart upload: {e}")
s3.abort_multipart_upload(Bucket=BUCKET, Key=s3_key, UploadId=upload_id)
# Step 3: Complete the multipart upload
if len(parts) == parts_num:
s3.complete_multipart_upload(
Bucket=BUCKET,
Key=s3_key,
MultipartUpload={'Parts': sorted(parts, key=lambda part: part['PartNumber'])},
UploadId=upload_id
)
print("Multipart upload completed successfully.")
print("Downloading the file back to ensure our code worked")
# Download the file to verify integrity
s3.download_file(Bucket=BUCKET, Key=s3_key, Filename=downloaded_file)
print(f"File downloaded to {downloaded_file}")
# Compare file sizes
uploaded_size = os.path.getsize(local_file)
downloaded_size = os.path.getsize(downloaded_file)
if uploaded_size == downloaded_size:
print("File integrity verified: Uploaded and downloaded file sizes match.")
else:
print(f"File integrity check failed: Uploaded size {uploaded_size}, Downloaded size {downloaded_size}")
else:
print("Multipart upload incomplete. Aborting.")
s3.abort_multipart_upload(Bucket=BUCKET, Key=s3_key, UploadId=upload_id)