Skip to content

Instantly share code, notes, and snippets.

@Mageswaran1989
Last active November 7, 2020 07:22
Show Gist options
  • Select an option

  • Save Mageswaran1989/7147d3f50d8affd890855ee84e9cbd30 to your computer and use it in GitHub Desktop.

Select an option

Save Mageswaran1989/7147d3f50d8affd890855ee84e9cbd30 to your computer and use it in GitHub Desktop.
import os
import boto3
from collections import defaultdict
import botocore
def get_matching_s3_objects(bucket,
aws_access_key_id,
aws_secret_access_key,
region_name,
prefix='',
suffix='',
max_keys_per_request=1):
"""
List objects in an S3 bucket.
:param bucket: Name of the S3 bucket.
:param prefix: Only fetch objects whose key starts with
this prefix (optional).
:param suffix: Only fetch objects whose keys end with
this suffix (optional).
:param max_keys_per_request: number of objects to list down
"""
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
kwargs = {'Bucket': bucket}
# If the prefix is a single string (not a tuple of strings), we can
# do the filtering directly in the S3 API.
if isinstance(prefix, str):
kwargs['Prefix'] = prefix
else:
kwargs['Prefix'] = str(prefix)
kwargs['MaxKeys'] = max_keys_per_request
while True:
# The S3 API response is a large blob of metadata.
# 'Contents' contains information about the listed objects.
resp = s3.list_objects_v2(**kwargs)
try:
contents = resp['Contents']
except KeyError:
return
for obj in contents:
key = obj['Key']
if key.startswith(prefix) and key.endswith(suffix):
yield obj
# The S3 API is paginated, returning up to 1000 keys at a time.
# Pass the continuation token into the next response, until we
# reach the final page (when this field is missing).
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
def get_matching_s3_keys(bucket,
aws_access_key_id,
aws_secret_access_key,
region_name,
prefix='',
suffix='',
max_keys_per_request=1):
"""
Generate the keys in an S3 bucket.
:param bucket: Name of the S3 bucket.
:param prefix: Only fetch keys that start with this prefix (optional).
:param suffix: Only fetch keys that end with this suffix (optional).
:param max_keys_per_request: number of objects to list down
"""
for obj in get_matching_s3_objects(bucket=bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
prefix=prefix,
suffix=suffix,
region_name=region_name,
max_keys_per_request=max_keys_per_request):
yield obj['Key']
def get_list_of_tables(root_path_or_bucket,
extract_path,
aws_access_key_id=None,
aws_secret_access_key=None,
region_name="us-west-2",
file_extension=".parquet"):
"""
Method to list down the tables under given snapshot path prefixed as:
bucket_name/path/to/year/month/date/{tables}
:param root_path_or_bucket:
:param extract_path: Path that leads to all tables organised date wise
:return: tables : Dict of year/month/date, where each key contains set of tables
"""
tables = defaultdict(set)
year = None
month = None
date = None
if len(extract_path) > 0:
extracted_keys = []
print(
"Listing the s3 bucket {}/{}! Will take a little while...".format(root_path_or_bucket, extract_path))
for key in get_matching_s3_keys(bucket=root_path_or_bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
prefix=extract_path,
suffix=file_extension,
max_keys_per_request=1):
extracted_keys.append(key)
splited_path = key.split("/")
date = splited_path[-3]
month = splited_path[-4]
year = splited_path[-5]
tables[year + "/" + month + "/" + date].add(splited_path[-2])
return tables
#####################################################
get_list_of_tables(root_path_or_bucket="dp-dev",
extract_path="data/iris/internal/compacted/2018/07/15",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name="us-west-2",
file_extension=".parquet")
#output
Listing the s3 bucket dp-dev/data/iris/internal/compacted/2018/07/15! Will take a little while...
defaultdict(set,
{'2018/07/15': {'Address',
'Master',
'MasterAllergy',
'MasterCity',
'MasterCode',
'MasterGender',
'MasterMaritalStatus',
'MasterPostalCode',
'PatientImmunization',
'PatientMedication',
'PatientProblem',
'PatientProblemHistory',
'PatientProcedure',
'PatientResultObservation',
'Visit',
'VisitDiagnosis'}})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment