Created
November 10, 2020 14:54
-
-
Save banderlog/cd14886421cf1cfcbdca7e8aab9dc584 to your computer and use it in GitHub Desktop.
Download dataset from AWS dataexchange via signedurls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Download dataset from AWS dataexchange via signedurls | |
Docs on the matter: | |
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dataexchange.html#DataExchange.Client.create_job | |
For via bucket upproach look here: | |
https://github.com/aws-samples/aws-dataexchange-api-samples/blob/master/subscribers/python/download-entitled-assets/download-entitled-assets.py | |
""" | |
import boto3 | |
import time | |
import urllib.request | |
from pathlib import Path | |
import click | |
def get_dataset_via_signedurls(dx, data_set_id: str, out_dir: str, | |
revision_id=None, rewrite=False): | |
""" Will dowload all assets for selected dataset using SIGNED_URL | |
:param dx: boto3.client('dataexchange', region_name='us-east-1') | |
:param data_set_id: ID of selected dataset | |
:param out_dir: where to put your files. | |
it will create 'dataset_name/revision_id' subdirs | |
:param revision_id: it will download lates revision if ``None`` | |
:param rewrite: will not download and rewrite already downloaded asset if ``False`` | |
""" | |
# get dataset name | |
res_d = dx.get_data_set(DataSetId=data_set_id) | |
dataset_name = res_d.get('Name') | |
# get dataset revision | |
if revision_id is None: | |
res_r = dx.list_data_set_revisions(DataSetId=data_set_id) | |
revision_id = res_r.get('Revisions')[0].get('Id') | |
# get assets | |
res_a = dx.list_revision_assets(DataSetId=data_set_id, | |
RevisionId=revision_id) | |
assets = res_a.get('Assets') | |
# create dir tree | |
destination = Path(out_dir, dataset_name, revision_id) | |
for asset in assets: | |
asset_path = Path(asset.get('Name')).parent | |
path = Path(destination, asset_path) | |
path.mkdir(parents=True, exist_ok=True) | |
# download | |
for asset in assets: | |
asset_destination = Path(destination, asset.get('Name')) | |
# check if already downloaded | |
if (not rewrite) and asset_destination.is_file(): | |
print(f'Skipping "{asset_destination}"') | |
continue | |
# create job to get url, url valid for 60sec | |
print(f'Downloading file "{asset_destination}"') | |
job = dx.create_job(Type='EXPORT_ASSET_TO_SIGNED_URL', | |
Details={ | |
'ExportAssetToSignedUrl': { | |
"AssetId": asset.get('Id'), | |
"DataSetId": asset.get('DataSetId'), | |
"RevisionId": asset.get('RevisionId') | |
} | |
}) | |
job_id = job.get('Id') | |
dx.start_job(JobId=job_id) | |
# check job state | |
while True: | |
# wait | |
time.sleep(1) | |
job = dx.get_job(JobId=job_id) | |
# download by signed_url | |
if job.get('State') == 'COMPLETED': | |
url = job.get('Details').get('ExportAssetToSignedUrl').get('SignedUrl') | |
urllib.request.urlretrieve(url, asset_destination) | |
print('COMPLETED') | |
break | |
# failure, just restart with ``rewrite=False`` | |
elif job.get('State') == 'ERROR': | |
message = job.get('Errors')[0].get('Message') | |
raise Exception(f"Job {job_id} failed to complete - {message}") | |
else: | |
print('WAITING') | |
@click.command() | |
@click.option('--data_set_id', help='ID of selected dataset', required=True, type=str) | |
@click.option('--destination_dir', default='./', type=str, show_default=True, | |
help="Where to download your dataset",) | |
@click.option('--revision_id', default=None, type=str, show_default=True, | |
help="Desired revision of dataset. Latest for default.") | |
@click.option('--rewrite', is_flag=True, help="By default, it will not download already downloaded assets") | |
def main(data_set_id, destination_dir, revision_id, rewrite): | |
dx = boto3.client('dataexchange', region_name='us-east-1') | |
get_dataset_via_signedurls(dx, data_set_id, destination_dir, | |
revision_id=revision_id, rewrite=rewrite) | |
print("------- DONE -------") | |
return 0 | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment