-
-
Save bwicklund/0000c9066845afc928e128f2ff79cba1 to your computer and use it in GitHub Desktop.
import argparse | |
import boto3 | |
import os | |
import threading | |
from fnmatch import fnmatch | |
# S3 multi-part upload parts must be larger than 5mb | |
MIN_S3_SIZE = 6000000 | |
LOG_LEVEL = 'INFO' | |
def concat(bucket, key, result_key, pattern): | |
s3_client = boto3.session.Session().client('s3') | |
objects_list = [x for x in list_all_objects( | |
s3_client, bucket, result_key, key) if fnmatch(x[0], pattern)] | |
print( | |
f"Found {len(objects_list)} parts to concatenate in s3://{bucket}/{key}") | |
for object in objects_list: | |
print(f"Found: {object[0]} - {round(object[1]/1000, 2)}k") | |
run_concatenation(s3_client, bucket, key, result_key, objects_list) | |
def list_all_objects(s3_client, bucket, result_key, key): | |
def format_return(resp): | |
return [(x['Key'], x['Size']) for x in resp['Contents']] | |
objects = [] | |
resp = s3_client.list_objects(Bucket=bucket, Prefix=key) | |
objects.extend(format_return(resp)) | |
while resp['IsTruncated']: | |
# If there are more objects than can be returned in a signle request | |
# then the key of the last item is used for pagination. | |
last_key = objects[-1][0] | |
resp = s3_client.list_objects( | |
Bucket=bucket, Prefix=key, Marker=last_key) | |
objects.extend(format_return(resp)) | |
return objects | |
def run_concatenation(s3_client, bucket, key, result_key, objects_list): | |
if len(objects_list) > 1: | |
upload_id = s3_client.create_multipart_upload( | |
Bucket=bucket, Key=result_key)['UploadId'] | |
parts_mapping = assemble_parts_to_concatenate( | |
s3_client, bucket, key, result_key, upload_id, objects_list) | |
if len(parts_mapping) == 0: | |
resp = s3_client.abort_multipart_upload( | |
Bucket=bucket, Key=result_key, UploadId=upload_id) | |
print( | |
f"Aborted concatenation for file {result_filename}, parts list empty!") | |
else: | |
resp = s3_client.complete_multipart_upload( | |
Bucket=bucket, Key=result_key, UploadId=upload_id, MultipartUpload={'Parts': parts_mapping}) | |
print( | |
f"Finished concatenation for file {result_key} response was: {resp}") | |
elif len(objects_list) == 1: | |
# can perform a simple S3 copy since there is just a single file | |
resp = s3_client.copy_object( | |
Bucket=bucket, CopySource=f"{bucket}/{objects_list[0][0]}", Key=result_key) | |
print(f"Copied single file to {result_key} response was: {resp}") | |
else: | |
print(f"No files to concatenate for {result_filepath}") | |
def assemble_parts_to_concatenate(s3_client, bucket, key, result_key, upload_id, objects_list): | |
parts_mapping = [] | |
part_num = 0 | |
s3_objects = ["{}/{}".format(bucket, p[0]) | |
for p in objects_list if p[1] > MIN_S3_SIZE] | |
local_objects = [p[0] for p in objects_list if p[1] | |
<= MIN_S3_SIZE and not p[0] == f"{key}/"] | |
total = len(s3_objects) + len(local_objects) | |
# assemble parts large enough for direct S3 copy | |
# part numbers are 1 indexed | |
for part_num, source_object in enumerate(s3_objects, 1): | |
resp = s3_client.upload_part_copy(Bucket=bucket, | |
Key=result_key, | |
PartNumber=part_num, | |
UploadId=upload_id, | |
CopySource=source_object) | |
print(f"@@@ Uploaded S3 object #{part_num} of {total}") | |
parts_mapping.append( | |
{'ETag': resp['CopyPartResult']['ETag'][1:-1], 'PartNumber': part_num}) | |
# Download the objects to small for direct s3 copy | |
# combine them, and then uploading them as the last part of the | |
# multi-part upload (no 5mb limit) | |
small_objects = [] | |
for source_object in local_objects: | |
# Remove forward slash | |
temp_filename = "/tmp/{}".format(source_object.replace("/", "_")) | |
s3_client.download_file( | |
Bucket=bucket, Key=source_object, Filename=temp_filename) | |
with open(temp_filename, 'rb') as f: | |
small_objects.append(f.read()) | |
os.remove(temp_filename) | |
print(f"@@@ Downloaded S3 Object: {source_object}") | |
if len(small_objects) > 0: | |
last_part_num = part_num + 1 | |
last_object = b''.join(small_objects) | |
resp = s3_client.upload_part( | |
Bucket=bucket, Key=result_key, PartNumber=last_part_num, UploadId=upload_id, Body=last_object) | |
print(f"@@@ Uploaded S3 object #{last_part_num} of {total}") | |
parts_mapping.append( | |
{'ETag': resp['ETag'][1:-1], 'PartNumber': last_part_num}) | |
return parts_mapping | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="S3 Concatenation Utility.") | |
parser.add_argument("--bucket", help="S3 Bucket.") | |
parser.add_argument( | |
"--key", help="Key/Folder Whose Contents Should Be Combined.") | |
parser.add_argument( | |
"--result_key", help="Output of Concatenation, Relative To The Specified Bucket.") | |
parser.add_argument("--pattern", default='*', | |
help="Pattern To Match The File Names Against For Adding To The Combination.") | |
args = parser.parse_args() | |
print("Combining files in s3://{}/{} to s3://{}/{} matching pattern {}".format( | |
args.bucket, args.key, args.bucket, args.result_key, args.pattern)) | |
concat(args.bucket, args.key, args.result_key, args.pattern) |
Great utility. However I am experiencing some issues: it seems to download ok, then when it tries to combine files, it says the following and produces no ouput:
<after multiple @@@Downloaded>
"@@@ Uploaded S3 object #1 of 52
Finished concatenation for file output/file.combine response was: {'ResponseMetadata': {'RequestId': '40469EFF169AD06C', 'HostId': 'XXX', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'jXXX=', 'x-amz-request-id': 'XXX', 'date': 'Sun, 08 Dec 2019 18:56:33 GMT', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Location': 'https://vrct-bucket.s3.us-west-2.amazonaws.com/output%2Ffile.combine', 'Bucket': 'vrct-bucket', 'Key': 'output/file.combine', 'ETag': '"XXX"'}
Should this be used as a Lambda function or locally/EC2?
Does this concatenate files less than 5mb?
Thank you!
I'm sorry but all my files are on the root directory of the S3 bucket and I don't know what to put for the --key. Any ideas?
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/10400_1995-01-06-output.csv.14Ba33Ee'
I am getting this error. It cannot find the temp file, can you help me on this?
It is failing with "Access Denied". Bucket is SSE-KMS encrypted. I need to pass KMS key for upload the objects on S3. Please suggest the changes.
Hi I am using in ec2-instance. But My output is not showing all records(some records are missing).And I am using for "parquet" files.
Command:
python3 s3_file_combine.py --bucket capmlp --key new/pvaimariasource1/pvai_activity1/ --result_key new/pvaimariasource1/pvai_activity1/LOAD00000002.parquet --pattern "*.parquet"
Any ideas
Sorry to be unresponsive on this. I no longer use this utility, and don't have the time or the environment to continue developing it or testing it. If anyone would be willing to take it over and work though the issues brought up here that would be awesome.
@Lydon-01 You still having problems?