Created
May 12, 2020 18:13
-
-
Save zduymz/3f731b2cc4711e363485585cddad3530 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import datetime | |
import asyncio | |
import concurrent | |
import multiprocessing as mp | |
import pandas as pd | |
''' | |
S3 Replication: For many reasons i dont know, there are more than 300M files | |
were failed replicated. | |
+ How do i know there is 300M failed replicated? | |
https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/ | |
+ The issue is i can see a slowness in python + EMR (took 2 weeks not | |
even finish first 100M) maybe i don't know how to optimize it. | |
I decided tweak the original python script a little to use multithread and multiprocess | |
Using m5a.4xlarge (64G 16vcpu) and ~30G (300M records from athena) | |
Result is 5days. | |
''' | |
runtime = datetime.datetime.utcnow().isoformat() | |
NUMBER_THREADS_PER_PROCESS = 200 | |
NUMBER_PROCESSES = mp.cpu_count() | |
CHUNK_LINES = 2000 | |
S3_BUCKET = 'documents' | |
def get_object_attributes(obj): | |
_acl = obj.Acl() | |
# 2 api calls | |
obj.load() | |
_acl.load() | |
storage_class = obj.storage_class if obj.storage_class else 'STANDARD' | |
metadata = obj.metadata if obj.metadata else {} | |
server_side_encryption = obj.server_side_encryption if obj.server_side_encryption else 'None' | |
last_modified = obj.last_modified | |
owner, grants = _acl.owner, _acl.grants | |
return storage_class, metadata, server_side_encryption, last_modified, owner, grants | |
def _copy(obj, params): | |
result = obj.copy_from(**params) | |
return result | |
def _updateAcl(obj, params): | |
result = obj.Acl().put(**params) | |
return result | |
async def async_call(executor, fn, *args): | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor(executor, fn, *args) | |
return result | |
async def worker(executor, s3api, bucket, key, copy_acls=True): | |
src_bucket, dest_bucket = s3api.Bucket(bucket), s3api.Bucket(bucket) | |
src_obj, dest_obj = src_bucket.Object(key), dest_bucket.Object(key) | |
try: | |
storage_class, metadata, sse_type, last_modified, owner, grants = await async_call(executor, get_object_attributes, src_obj) | |
# Update the Metadata so the copy will work | |
metadata['forcedreplication'] = runtime | |
params = { | |
'CopySource': { | |
'Bucket': bucket, | |
'Key': key | |
}, | |
'MetadataDirective': 'REPLACE', | |
'TaggingDirective': 'COPY', | |
'Metadata': metadata, | |
'StorageClass': storage_class | |
} | |
# Set Server Side Encryption | |
if sse_type == 'AES256': | |
params['ServerSideEncryption'] = 'ES256' | |
elif sse_type == 'aws:kms': | |
# i dont care about this call because i didnt use encryption | |
kms_key = src_obj.ssekms_key_id | |
params['ServerSideEncryption'] = 'aws:kms' | |
params['SSEKMSKeyId'] = kms_key | |
result = await async_call(executor, _copy, dest_obj, params) | |
# Put the ACL back on the Object | |
if copy_acls: | |
dest_acl = { | |
'AccessControlPolicy': { | |
'Grants': grants, | |
'Owner': owner | |
} | |
} | |
await async_call(executor, _updateAcl, dest_obj, dest_acl) | |
# return bucket, key, 'TRUE' | |
except Exception as e: | |
print(e) | |
pass | |
async def copy_rows(df, copy_acls): | |
s3api = boto3.resource('s3') | |
executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS_PER_PROCESS,) | |
_l = lambda x : str(x).strip('"') | |
futures = [ | |
worker(executor, s3api, S3_BUCKET, _l(key), copy_acls) | |
for key in df.key | |
] | |
results = await asyncio.gather(*futures) | |
print(f'Done') | |
def main(df): | |
print('start') | |
asyncio.run(copy_rows(df, True)) | |
if __name__ == "__main__": | |
print(runtime) | |
chunks = pd.read_csv('/home/ubuntu/300M.csv', sep=',', header=0, chunksize=CHUNK_LINES) | |
with concurrent.futures.ProcessPoolExecutor(max_workers=NUMBER_PROCESSES) as pool: | |
pool.map(main, [chunk for chunk in chunks]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment