Last active
December 3, 2023 18:09
-
-
Save ntantri/7c4af7ebf5d1a877f99352d80c5dc7fe to your computer and use it in GitHub Desktop.
An executor library which uses the select_runner.py to run queries on AWS S3 files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import argparse | |
import sys | |
import boto3 | |
from select_runner import * | |
def fetch_objects(bucket, prefix, throwerror): | |
""" | |
This solution was available via: https://alexwlchan.net/2018/01/listing-s3-keys-redux/ | |
This function is the crucial function, which loops through all the available files in a folder and fetches the key of that object. | |
Parameters | |
---------- | |
bucket: string | |
takes the bucket name | |
prefix: string | |
takes the prefix - either till the folder or till the entire file name. | |
throwerror: boolean | |
this boolean value depicts whether to throw exception in case there | |
is any missing data (useful for validation) | |
Returns | |
------- | |
list | |
Generates a list of keys which are found inside the prefix | |
""" | |
print 'Fetching s3 files from bucket ' + bucket + ' and prefix ' + prefix | |
s3 = boto3.client('s3') | |
kwargs = {'Bucket': bucket} | |
# If the prefix is a single string (not a tuple of strings), we can | |
# do the filtering directly in the S3 API. | |
if isinstance(prefix, str): | |
kwargs['Prefix'] = prefix | |
while True: | |
# The S3 API response is a large blob of metadata. | |
# 'Contents' contains information about the listed objects. | |
resp = s3.list_objects_v2(**kwargs) | |
try: | |
contents = resp['Contents'] | |
except KeyError: | |
if throwerror == 'true': | |
raise Exception('Could not find data for prefix: ' + prefix) | |
return | |
for obj in contents: | |
key = obj['Key'] | |
if key.startswith(prefix): | |
yield key | |
# The S3 API is paginated, returning up to 1000 keys at a time. | |
# Pass the continuation token into the next response, until we | |
# reach the final page (when this field is missing). | |
try: | |
kwargs['ContinuationToken'] = resp['NextContinuationToken'] | |
except KeyError: | |
break | |
def fetch_args(): | |
""" | |
Is an arguments parser which showcases all possible arguments this python function takes in. | |
""" | |
parser = \ | |
argparse.ArgumentParser(description='''Provide S3 details: example python s3_executor.py -b "bucket-name" -p "path/to/file/mysample-file.tsv.gz" -d TAB -s "select * from s3object s where _88 != 'VIEW_DETECTED' limit 10" ''') | |
parser.add_argument('-b', metavar='--bucket', | |
help='''Provide the bucket name, for example: bucket-name ''') | |
parser.add_argument('-p', metavar='--prefix', | |
help='''Provide the prefix - till the folder, for example: path/to/file/mysample-file.tsv.gz''') | |
parser.add_argument('-comp', metavar='--compression', help='''Is the compression | |
available - GZIP/None/BZIP2''', default="GZIP") | |
parser.add_argument('-c', metavar='--ctype', help='Content Type of the file - CSV/JSON/Parquet', | |
default='CSV') | |
parser.add_argument('-d', metavar='--delimiter', help='Provide the Delimiter - COMMA/TAB', default='COMMA') | |
parser.add_argument('-s', metavar='--sql', help='''Provide the SQL to be executed, for example: | |
select _1, _2, _20 from s3object s where _88 != 'VIEW_DETECTED' limit 10''', default='') | |
parser.add_argument('-o', metavar='--outputfile', help='Provide the filename to dump the records fetched', default='') | |
parser.add_argument('-e', metavar='--throwerror', help='Boolean value true/false - which determines whether to throw error while processing', default='false') | |
return parser | |
def get_compression(comp): | |
""" | |
Get's the value the compression, in case there is no compression provided then it would be empty string. | |
""" | |
value = "" | |
if comp: | |
value = comp | |
return value | |
def get_content_type(content_type): | |
""" | |
Get's the content_type for respective option. | |
In case it's a TSV or CSV the return value is always csv | |
""" | |
value = "CSV" | |
if content_type == "Parquet": | |
value = content_type | |
return value | |
def get_delimiter(delimiter): | |
""" | |
Get's the delimiter, in case nothing is given, then it would None | |
""" | |
value = None | |
if delimiter == "COMMA": | |
value = "," | |
elif delimiter == "TAB": | |
value = "\t" | |
return value | |
def get_content_options(kwargs): | |
""" | |
Get's the content options for given data. | |
For Parquet this is not needed. | |
""" | |
content_options = {} | |
if kwargs['content_type'] != 'Parquet': | |
content_options = {'AllowQuotedRecordDelimiter': True, 'QuoteCharacter' : ""} | |
if delimiter: | |
content_options['FieldDelimiter'] = kwargs['delimiter'] | |
return content_options | |
if __name__ == '__main__': | |
parser = fetch_args() | |
args = parser.parse_args() | |
sql = args.s | |
bucket = args.b | |
prefix = args.p | |
throwerror = args.e | |
if bucket is None or prefix is None or sql is None: | |
print "Please use: python s3_executor.py --help and pass valid arguments" | |
exit(1) | |
object_items = fetch_objects(bucket, prefix, throwerror) | |
comp = get_compression(args.comp) | |
content_type = get_content_type(args.c) | |
delimiter = get_delimiter(args.d) | |
output_file = args.o | |
content_options = get_content_options({ 'content_type': content_type, 'delimiter': delimiter }) | |
for item in object_items: | |
perform(bucket, item, sql, comp, content_type, content_options, output_file) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment