- First script can only update the storage class of files smaller than 5GB
- Second one uses multipart_copy to split files in 10MB parts and copy them one by one
- Third script tries to do the multipart_copy with a pool of threads. this one is not fully tested as Lambda does not give access to /dev/shm
Last active
October 6, 2017 21:37
-
-
Save asvinours/e4590456c21674a57e0838d38d75a55f to your computer and use it in GitHub Desktop.
Update storage class of objects on S3 from standard to standard_ia
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json, urllib, boto3, pprint | |
s3 = boto3.resource('s3') | |
def lambda_handler(event, context): | |
bucket = event['Records'][0]['s3']['bucket']['name'] | |
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8') | |
try: | |
o = s3.Object(bucket, key) | |
if o.storage_class != 'STANDARD_IA': | |
file_size = o.content_length | |
if file_size >= 5000000000: | |
print("Multi part copy is required, size is >= 5GB") | |
else: | |
response = o.copy_from(CopySource={'Bucket': bucket, 'Key': key}, MetadataDirective='COPY', StorageClass='STANDARD_IA') | |
print("Setting object to STANDARD_IA: B: {}, K: {}".format(bucket, key)) | |
else: | |
print("Received notify on {}, but storage class is {}".format(key, o.storage_class)) | |
return | |
except Exception as e: | |
print(e) | |
print('Failed on B: {} K: {}.'.format(bucket, key)) | |
raise e |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json, urllib, boto3, pprint, math | |
from multiprocessing.managers import BaseManager | |
from multiprocessing import Pool | |
class MyManager(BaseManager): pass | |
def Manager(): | |
m = MyManager() | |
m.start() | |
return m | |
class S3MPU(object): | |
def __init__(self): | |
self._parts = { | |
'Parts': [] | |
} | |
def add_part(self, part): | |
self._parts['Parts'].append(part) | |
def get_parts(self): | |
return self._parts | |
class Copier(object): | |
def __call__(self, args): | |
do_part_copy(args) | |
MyManager.register('S3MPU', S3MPU) | |
s3 = boto3.resource('s3') | |
s3_client = boto3.client('s3') | |
def do_part_copy(args): | |
print("Copying part {} to STANDARD_IA".format(part_num)) | |
bucket, key, mpu_id, part_num, start_pos, end_pos, s3mpu = args | |
s3_client = boto3.client('s3') | |
res = s3_client.upload_part_copy(Bucket=bucket, CopySource={'Bucket': bucket, 'Key': key}, | |
CopySourceRange='bytes=%d-%d' % (start_pos, end_pos), | |
Key=key, | |
PartNumber=part_num, UploadId=mpu_id) | |
s3mpu.add_part({ | |
'PartNumber': part_num, | |
'ETag': res['CopyPartResult']['ETag'] | |
}) | |
print("Part {} copied to STANDARD_IA".format(part_num)) | |
def lambda_handler(event, context): | |
bucket = event['Records'][0]['s3']['bucket']['name'] | |
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8') | |
try: | |
o = s3.Object(bucket, key) | |
if o.storage_class != 'STANDARD_IA': | |
file_size = o.content_length | |
if file_size >= 5000000000: | |
print("Multi part copy is required, size is >= 5GB") | |
manager = Manager() | |
s3mpu = manager.S3MPU() | |
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key, StorageClass='STANDARD_IA') | |
byte_position = 0 | |
part_size = 10 * math.pow(2.0, 20.0) | |
if file_size > part_size: | |
full_parts = int(file_size / part_size) | |
else: | |
full_parts = 1 | |
part_size = file_size | |
total_parts = full_parts | |
if full_parts * part_size < file_size: | |
total_parts += 1 | |
print("Number of parts: {}".format(total_parts)) | |
# Generate arguments for invocations of do_part_copy | |
def gen_args(num_parts): | |
cur_pos = 0 | |
for i in range(num_parts): | |
part_start = cur_pos | |
cur_pos = cur_pos + part_size | |
part_end = min(cur_pos - 1, file_size - 1) | |
part_num = i + 1 | |
yield (bucket, key, mpu["UploadId"], part_num, part_start, part_end, s3mpu) | |
try: | |
print("Trying to create the processing pool") | |
pool = Pool(processes=10) | |
t1 = time.time() | |
print("Processing pool created") | |
pool.map_async(Copier(), gen_args(total_parts)).get(9999999) | |
s3_client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"], MultipartUpload=s3mpu.get_parts()) | |
t2 = time.time() - t1 | |
s = file_size/1024./1024. | |
print("Multi part copy is done") | |
print("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2)) | |
except KeyboardInterrupt: | |
print("Received KeyboardInterrupt, canceling copy") | |
pool.terminate() | |
s3_client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"]) | |
except Exception, err: | |
print("Encountered an error, canceling copy") | |
print(err) | |
s3_client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"]) | |
else: | |
response = o.copy_from(CopySource={'Bucket': bucket, 'Key': key}, MetadataDirective='COPY', StorageClass='STANDARD_IA') | |
print("Setting object to STANDARD_IA: B: {}, K: {}".format(bucket, key)) | |
else: | |
print("Received notify on {}, but storage class is {}".format(key, o.storage_class)) | |
return | |
except Exception as e: | |
print(e) | |
print('Failed on B: {} K: {}.'.format(bucket, key)) | |
raise e |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json, urllib, boto3, pprint, math | |
s3 = boto3.resource('s3') | |
s3_client = boto3.client('s3') | |
def lambda_handler(event, context): | |
bucket = event['Records'][0]['s3']['bucket']['name'] | |
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8') | |
try: | |
o = s3.Object(bucket, key) | |
if o.storage_class != 'STANDARD_IA': | |
file_size = o.content_length | |
if file_size >= 5000000000: | |
print("Multi part copy is required, size is >= 5GB") | |
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key, StorageClass='STANDARD_IA') | |
byte_position = 0 | |
i = 1 | |
part_size = 10 * math.pow(2.0, 20.0) | |
if file_size > part_size: | |
full_parts = int(file_size / part_size) | |
else: | |
full_parts = 1 | |
part_size = file_size | |
total_parts = full_parts | |
if full_parts * part_size < file_size: | |
total_parts += 1 | |
print("Number of parts: ~ {}".format(total_parts)) | |
part_info = { | |
'Parts': [] | |
} | |
while byte_position < file_size: | |
lastbyte = byte_position + part_size - 1 | |
if lastbyte > file_size: | |
lastbyte = file_size - 1 | |
res = s3_client.upload_part_copy(Bucket=bucket, CopySource={'Bucket': bucket, 'Key': key}, | |
CopySourceRange='bytes=%d-%d' % (byte_position, lastbyte), | |
Key=key, | |
PartNumber=i, UploadId=mpu["UploadId"]) | |
part_info['Parts'].append({ | |
'PartNumber': i, | |
'ETag': res['CopyPartResult']['ETag'] | |
}) | |
i = i + 1 | |
byte_position += part_size | |
print("Part {} copied to STANDARD_IA".format(i)) | |
s3_client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"], MultipartUpload=part_info) | |
print("Multi part copy is done") | |
else: | |
response = o.copy_from(CopySource={'Bucket': bucket, 'Key': key}, MetadataDirective='COPY', StorageClass='STANDARD_IA') | |
print("Setting object to STANDARD_IA: B: {}, K: {}".format(bucket, key)) | |
else: | |
print("Received notify on {}, but storage class is {}".format(key, o.storage_class)) | |
return | |
except Exception as e: | |
print(e) | |
print('Failed on B: {} K: {}.'.format(bucket, key)) | |
raise e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment