Last active
November 7, 2020 07:22
-
-
Save Mageswaran1989/7147d3f50d8affd890855ee84e9cbd30 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import boto3 | |
| from collections import defaultdict | |
| import botocore | |
| def get_matching_s3_objects(bucket, | |
| aws_access_key_id, | |
| aws_secret_access_key, | |
| region_name, | |
| prefix='', | |
| suffix='', | |
| max_keys_per_request=1): | |
| """ | |
| List objects in an S3 bucket. | |
| :param bucket: Name of the S3 bucket. | |
| :param prefix: Only fetch objects whose key starts with | |
| this prefix (optional). | |
| :param suffix: Only fetch objects whose keys end with | |
| this suffix (optional). | |
| :param max_keys_per_request: number of objects to list down | |
| """ | |
| s3 = boto3.client('s3', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| region_name=region_name) | |
| kwargs = {'Bucket': bucket} | |
| # If the prefix is a single string (not a tuple of strings), we can | |
| # do the filtering directly in the S3 API. | |
| if isinstance(prefix, str): | |
| kwargs['Prefix'] = prefix | |
| else: | |
| kwargs['Prefix'] = str(prefix) | |
| kwargs['MaxKeys'] = max_keys_per_request | |
| while True: | |
| # The S3 API response is a large blob of metadata. | |
| # 'Contents' contains information about the listed objects. | |
| resp = s3.list_objects_v2(**kwargs) | |
| try: | |
| contents = resp['Contents'] | |
| except KeyError: | |
| return | |
| for obj in contents: | |
| key = obj['Key'] | |
| if key.startswith(prefix) and key.endswith(suffix): | |
| yield obj | |
| # The S3 API is paginated, returning up to 1000 keys at a time. | |
| # Pass the continuation token into the next response, until we | |
| # reach the final page (when this field is missing). | |
| try: | |
| kwargs['ContinuationToken'] = resp['NextContinuationToken'] | |
| except KeyError: | |
| break | |
| def get_matching_s3_keys(bucket, | |
| aws_access_key_id, | |
| aws_secret_access_key, | |
| region_name, | |
| prefix='', | |
| suffix='', | |
| max_keys_per_request=1): | |
| """ | |
| Generate the keys in an S3 bucket. | |
| :param bucket: Name of the S3 bucket. | |
| :param prefix: Only fetch keys that start with this prefix (optional). | |
| :param suffix: Only fetch keys that end with this suffix (optional). | |
| :param max_keys_per_request: number of objects to list down | |
| """ | |
| for obj in get_matching_s3_objects(bucket=bucket, | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| prefix=prefix, | |
| suffix=suffix, | |
| region_name=region_name, | |
| max_keys_per_request=max_keys_per_request): | |
| yield obj['Key'] | |
| def get_list_of_tables(root_path_or_bucket, | |
| extract_path, | |
| aws_access_key_id=None, | |
| aws_secret_access_key=None, | |
| region_name="us-west-2", | |
| file_extension=".parquet"): | |
| """ | |
| Method to list down the tables under given snapshot path prefixed as: | |
| bucket_name/path/to/year/month/date/{tables} | |
| :param root_path_or_bucket: | |
| :param extract_path: Path that leads to all tables organised date wise | |
| :return: tables : Dict of year/month/date, where each key contains set of tables | |
| """ | |
| tables = defaultdict(set) | |
| year = None | |
| month = None | |
| date = None | |
| if len(extract_path) > 0: | |
| extracted_keys = [] | |
| print( | |
| "Listing the s3 bucket {}/{}! Will take a little while...".format(root_path_or_bucket, extract_path)) | |
| for key in get_matching_s3_keys(bucket=root_path_or_bucket, | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| region_name=region_name, | |
| prefix=extract_path, | |
| suffix=file_extension, | |
| max_keys_per_request=1): | |
| extracted_keys.append(key) | |
| splited_path = key.split("/") | |
| date = splited_path[-3] | |
| month = splited_path[-4] | |
| year = splited_path[-5] | |
| tables[year + "/" + month + "/" + date].add(splited_path[-2]) | |
| return tables | |
| ##################################################### | |
| get_list_of_tables(root_path_or_bucket="dp-dev", | |
| extract_path="data/iris/internal/compacted/2018/07/15", | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name="us-west-2", | |
| file_extension=".parquet") | |
| #output | |
| Listing the s3 bucket dp-dev/data/iris/internal/compacted/2018/07/15! Will take a little while... | |
| defaultdict(set, | |
| {'2018/07/15': {'Address', | |
| 'Master', | |
| 'MasterAllergy', | |
| 'MasterCity', | |
| 'MasterCode', | |
| 'MasterGender', | |
| 'MasterMaritalStatus', | |
| 'MasterPostalCode', | |
| 'PatientImmunization', | |
| 'PatientMedication', | |
| 'PatientProblem', | |
| 'PatientProblemHistory', | |
| 'PatientProcedure', | |
| 'PatientResultObservation', | |
| 'Visit', | |
| 'VisitDiagnosis'}}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment