Skip to content

Instantly share code, notes, and snippets.

@bwicklund
Last active March 12, 2023 08:27
Show Gist options
  • Save bwicklund/0000c9066845afc928e128f2ff79cba1 to your computer and use it in GitHub Desktop.
Save bwicklund/0000c9066845afc928e128f2ff79cba1 to your computer and use it in GitHub Desktop.
S3 file Concatenation/Combination. S3 Spark file merge.
import argparse
import boto3
import os
import threading
from fnmatch import fnmatch
# S3 multi-part upload parts must be larger than 5mb
MIN_S3_SIZE = 6000000
LOG_LEVEL = 'INFO'
def concat(bucket, key, result_key, pattern):
s3_client = boto3.session.Session().client('s3')
objects_list = [x for x in list_all_objects(
s3_client, bucket, result_key, key) if fnmatch(x[0], pattern)]
print(
f"Found {len(objects_list)} parts to concatenate in s3://{bucket}/{key}")
for object in objects_list:
print(f"Found: {object[0]} - {round(object[1]/1000, 2)}k")
run_concatenation(s3_client, bucket, key, result_key, objects_list)
def list_all_objects(s3_client, bucket, result_key, key):
def format_return(resp):
return [(x['Key'], x['Size']) for x in resp['Contents']]
objects = []
resp = s3_client.list_objects(Bucket=bucket, Prefix=key)
objects.extend(format_return(resp))
while resp['IsTruncated']:
# If there are more objects than can be returned in a signle request
# then the key of the last item is used for pagination.
last_key = objects[-1][0]
resp = s3_client.list_objects(
Bucket=bucket, Prefix=key, Marker=last_key)
objects.extend(format_return(resp))
return objects
def run_concatenation(s3_client, bucket, key, result_key, objects_list):
if len(objects_list) > 1:
upload_id = s3_client.create_multipart_upload(
Bucket=bucket, Key=result_key)['UploadId']
parts_mapping = assemble_parts_to_concatenate(
s3_client, bucket, key, result_key, upload_id, objects_list)
if len(parts_mapping) == 0:
resp = s3_client.abort_multipart_upload(
Bucket=bucket, Key=result_key, UploadId=upload_id)
print(
f"Aborted concatenation for file {result_filename}, parts list empty!")
else:
resp = s3_client.complete_multipart_upload(
Bucket=bucket, Key=result_key, UploadId=upload_id, MultipartUpload={'Parts': parts_mapping})
print(
f"Finished concatenation for file {result_key} response was: {resp}")
elif len(objects_list) == 1:
# can perform a simple S3 copy since there is just a single file
resp = s3_client.copy_object(
Bucket=bucket, CopySource=f"{bucket}/{objects_list[0][0]}", Key=result_key)
print(f"Copied single file to {result_key} response was: {resp}")
else:
print(f"No files to concatenate for {result_filepath}")
def assemble_parts_to_concatenate(s3_client, bucket, key, result_key, upload_id, objects_list):
parts_mapping = []
part_num = 0
s3_objects = ["{}/{}".format(bucket, p[0])
for p in objects_list if p[1] > MIN_S3_SIZE]
local_objects = [p[0] for p in objects_list if p[1]
<= MIN_S3_SIZE and not p[0] == f"{key}/"]
total = len(s3_objects) + len(local_objects)
# assemble parts large enough for direct S3 copy
# part numbers are 1 indexed
for part_num, source_object in enumerate(s3_objects, 1):
resp = s3_client.upload_part_copy(Bucket=bucket,
Key=result_key,
PartNumber=part_num,
UploadId=upload_id,
CopySource=source_object)
print(f"@@@ Uploaded S3 object #{part_num} of {total}")
parts_mapping.append(
{'ETag': resp['CopyPartResult']['ETag'][1:-1], 'PartNumber': part_num})
# Download the objects to small for direct s3 copy
# combine them, and then uploading them as the last part of the
# multi-part upload (no 5mb limit)
small_objects = []
for source_object in local_objects:
# Remove forward slash
temp_filename = "/tmp/{}".format(source_object.replace("/", "_"))
s3_client.download_file(
Bucket=bucket, Key=source_object, Filename=temp_filename)
with open(temp_filename, 'rb') as f:
small_objects.append(f.read())
os.remove(temp_filename)
print(f"@@@ Downloaded S3 Object: {source_object}")
if len(small_objects) > 0:
last_part_num = part_num + 1
last_object = b''.join(small_objects)
resp = s3_client.upload_part(
Bucket=bucket, Key=result_key, PartNumber=last_part_num, UploadId=upload_id, Body=last_object)
print(f"@@@ Uploaded S3 object #{last_part_num} of {total}")
parts_mapping.append(
{'ETag': resp['ETag'][1:-1], 'PartNumber': last_part_num})
return parts_mapping
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="S3 Concatenation Utility.")
parser.add_argument("--bucket", help="S3 Bucket.")
parser.add_argument(
"--key", help="Key/Folder Whose Contents Should Be Combined.")
parser.add_argument(
"--result_key", help="Output of Concatenation, Relative To The Specified Bucket.")
parser.add_argument("--pattern", default='*',
help="Pattern To Match The File Names Against For Adding To The Combination.")
args = parser.parse_args()
print("Combining files in s3://{}/{} to s3://{}/{} matching pattern {}".format(
args.bucket, args.key, args.bucket, args.result_key, args.pattern))
concat(args.bucket, args.key, args.result_key, args.pattern)
@bwicklund
Copy link
Author

bwicklund commented Jan 21, 2020 via email

@farshadniayeshpour
Copy link

Does this concatenate files less than 5mb?

@bwicklund
Copy link
Author

bwicklund commented Jan 21, 2020 via email

@farshadniayeshpour
Copy link

Thank you!

@farshadniayeshpour
Copy link

I'm sorry but all my files are on the root directory of the S3 bucket and I don't know what to put for the --key. Any ideas?

@farshadniayeshpour
Copy link

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/10400_1995-01-06-output.csv.14Ba33Ee'
I am getting this error. It cannot find the temp file, can you help me on this?

@xfinity1010
Copy link

It is failing with "Access Denied". Bucket is SSE-KMS encrypted. I need to pass KMS key for upload the objects on S3. Please suggest the changes.

@crazyavi
Copy link

Hi I am using in ec2-instance. But My output is not showing all records(some records are missing).And I am using for "parquet" files.
Command:
python3 s3_file_combine.py --bucket capmlp --key new/pvaimariasource1/pvai_activity1/ --result_key new/pvaimariasource1/pvai_activity1/LOAD00000002.parquet --pattern "*.parquet"
Any ideas

@bwicklund
Copy link
Author

Sorry to be unresponsive on this. I no longer use this utility, and don't have the time or the environment to continue developing it or testing it. If anyone would be willing to take it over and work though the issues brought up here that would be awesome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment