Skip to content

Instantly share code, notes, and snippets.

@zduymz
Created May 12, 2020 18:13
Show Gist options
  • Save zduymz/3f731b2cc4711e363485585cddad3530 to your computer and use it in GitHub Desktop.
Save zduymz/3f731b2cc4711e363485585cddad3530 to your computer and use it in GitHub Desktop.
import boto3
import datetime
import asyncio
import concurrent
import multiprocessing as mp
import pandas as pd
'''
S3 Replication: For many reasons i dont know, there are more than 300M files
were failed replicated.
+ How do i know there is 300M failed replicated?
https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/
+ The issue is i can see a slowness in python + EMR (took 2 weeks not
even finish first 100M) maybe i don't know how to optimize it.
I decided tweak the original python script a little to use multithread and multiprocess
Using m5a.4xlarge (64G 16vcpu) and ~30G (300M records from athena)
Result is 5days.
'''
runtime = datetime.datetime.utcnow().isoformat()
NUMBER_THREADS_PER_PROCESS = 200
NUMBER_PROCESSES = mp.cpu_count()
CHUNK_LINES = 2000
S3_BUCKET = 'documents'
def get_object_attributes(obj):
_acl = obj.Acl()
# 2 api calls
obj.load()
_acl.load()
storage_class = obj.storage_class if obj.storage_class else 'STANDARD'
metadata = obj.metadata if obj.metadata else {}
server_side_encryption = obj.server_side_encryption if obj.server_side_encryption else 'None'
last_modified = obj.last_modified
owner, grants = _acl.owner, _acl.grants
return storage_class, metadata, server_side_encryption, last_modified, owner, grants
def _copy(obj, params):
result = obj.copy_from(**params)
return result
def _updateAcl(obj, params):
result = obj.Acl().put(**params)
return result
async def async_call(executor, fn, *args):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, fn, *args)
return result
async def worker(executor, s3api, bucket, key, copy_acls=True):
src_bucket, dest_bucket = s3api.Bucket(bucket), s3api.Bucket(bucket)
src_obj, dest_obj = src_bucket.Object(key), dest_bucket.Object(key)
try:
storage_class, metadata, sse_type, last_modified, owner, grants = await async_call(executor, get_object_attributes, src_obj)
# Update the Metadata so the copy will work
metadata['forcedreplication'] = runtime
params = {
'CopySource': {
'Bucket': bucket,
'Key': key
},
'MetadataDirective': 'REPLACE',
'TaggingDirective': 'COPY',
'Metadata': metadata,
'StorageClass': storage_class
}
# Set Server Side Encryption
if sse_type == 'AES256':
params['ServerSideEncryption'] = 'ES256'
elif sse_type == 'aws:kms':
# i dont care about this call because i didnt use encryption
kms_key = src_obj.ssekms_key_id
params['ServerSideEncryption'] = 'aws:kms'
params['SSEKMSKeyId'] = kms_key
result = await async_call(executor, _copy, dest_obj, params)
# Put the ACL back on the Object
if copy_acls:
dest_acl = {
'AccessControlPolicy': {
'Grants': grants,
'Owner': owner
}
}
await async_call(executor, _updateAcl, dest_obj, dest_acl)
# return bucket, key, 'TRUE'
except Exception as e:
print(e)
pass
async def copy_rows(df, copy_acls):
s3api = boto3.resource('s3')
executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS_PER_PROCESS,)
_l = lambda x : str(x).strip('"')
futures = [
worker(executor, s3api, S3_BUCKET, _l(key), copy_acls)
for key in df.key
]
results = await asyncio.gather(*futures)
print(f'Done')
def main(df):
print('start')
asyncio.run(copy_rows(df, True))
if __name__ == "__main__":
print(runtime)
chunks = pd.read_csv('/home/ubuntu/300M.csv', sep=',', header=0, chunksize=CHUNK_LINES)
with concurrent.futures.ProcessPoolExecutor(max_workers=NUMBER_PROCESSES) as pool:
pool.map(main, [chunk for chunk in chunks])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment