|
""" |
|
Data download script for EarthExplorer Machine-to-Machine |
|
|
|
download_m2m('/path/to/downloads', username='user1234', dataset='ARD_TILE', |
|
products='TOA,BT,SR,PQA', threads=40, |
|
fields={'Region': 'CU', 'Spacecraft': 'LANDSAT_8'}) |
|
|
|
More M2M documentation: https://earthexplorer.usgs.gov/inventory/documentation |
|
|
|
Author: Jake Brinkmann <[email protected]> |
|
LSDS Science Research and Development (LSRD) Project |
|
Date: 10/30/2017 |
|
|
|
This code is in the public domain. |
|
It is provided AS-IS WITH NO WARRANTY WHATSOEVER. |
|
""" |
|
import requests, json, os, sys, getpass, urllib3, time |
|
from multiprocessing import Pool # use threads for I/O bound tasks |
|
from argparse import ArgumentParser |
|
import ConfigParser |
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
|
|
def message(msg, stop=False): |
|
stdout = sys.stderr |
|
if isinstance(msg, list): |
|
msg = '\n'.join(msg) |
|
if stop: |
|
msg = '\n' + msg + '\n' |
|
stdout.write(msg) |
|
stdout.flush() |
|
if stop: |
|
exit(1) |
|
|
|
|
|
class EarthExplorer(object): |
|
""" Web-Service interface for EarthExplorer JSON Machine-to-Machine API """ |
|
def __init__(self, version='1.4.1'): |
|
self.baseurl = 'https://earthexplorer.usgs.gov/inventory/json/v/%s/' % version |
|
|
|
def _api(self, endpoint='login', body=None): |
|
body = {'jsonRequest': json.dumps(body)} if body else {} |
|
r = requests.post(self.baseurl+endpoint, data=body) |
|
r.raise_for_status() |
|
dat = r.json() |
|
if dat.get('error'): |
|
message(': '.join([dat.get('errorCode'), dat.get('error')]), stop=True) |
|
return dat |
|
|
|
@classmethod |
|
def login(cls, username, password=None): |
|
if password is None: |
|
password = getpass.getpass('Password (%s): ' % username) |
|
payload = {'username': username, 'password': password} |
|
return cls()._api('login', payload).get('data') |
|
|
|
@classmethod |
|
def search(cls, **kwargs): |
|
return cls()._api('search', kwargs).get('data') |
|
|
|
@classmethod |
|
def idlookup(cls, **kwargs): |
|
return cls()._api('idlookup', kwargs).get('data') |
|
|
|
@classmethod |
|
def metadata(cls, **kwargs): |
|
return cls()._api('metadata', kwargs).get('data') |
|
|
|
@classmethod |
|
def download(cls, **kwargs): |
|
return cls()._api('download', kwargs).get('data') |
|
|
|
@classmethod |
|
def downloadoptions(cls, **kwargs): |
|
return cls()._api('downloadoptions', kwargs).get('data') |
|
|
|
@classmethod |
|
def datasets(cls, **kwargs): |
|
return cls()._api('datasets', kwargs).get('data') |
|
|
|
@classmethod |
|
def datasetfields(cls, **kwargs): |
|
return cls()._api('datasetfields', kwargs).get('data') |
|
|
|
@classmethod |
|
def additionalCriteriaValues(cls, api_key, dataset, filters): |
|
""" Attempts to build a complex search based on some simple JSON input |
|
|
|
Example: filters = {"Path": 29, "Row": 29, "Sensor": "ETM+" } |
|
|
|
TODO: Add support for AND/OR/BETWEEN searches |
|
""" |
|
fields = cls.datasetfields(apiKey=api_key, datasetName=dataset) |
|
k = 'additionalCriteria' |
|
criteria = {k: {"filterType": "and", "childFilters": []}} |
|
for look_for, compare in filters.items(): |
|
field_matches = [f for f in fields if look_for in f['name']] |
|
if len(field_matches) > 1: |
|
message(['Search "%s" not unique. Found:' % look_for] |
|
+ ['* %s' % f['name'] for f in field_matches], stop=True) |
|
elif len(field_matches) < 1: |
|
message(['Search "%s" failed. Available:' % look_for] |
|
+ ['* %s' % f['name'] for f in fields], stop=True) |
|
|
|
field_id = int(field_matches[0]['fieldId']) |
|
field_name = field_matches[0]['name'] |
|
selections = field_matches[0].get('valueList') |
|
mapping = {str(s['name']): str(s['value']) for s in selections} |
|
if not isinstance(compare, (list, dict)): |
|
if mapping and compare not in mapping.values(): |
|
message(['"%s" invalid value not found: %s' % (field_name, compare)] |
|
+ ['* %s: %s' % (k, v) for k, v in mapping.items()], |
|
stop=True) |
|
search = {"filterType": "value", "fieldId": field_id, "value": compare} |
|
criteria[k]['childFilters'].append(search) |
|
return criteria |
|
|
|
@staticmethod |
|
def temporalCriteria(temporal): |
|
dates = temporal.split(',') |
|
sd, ed = dates if len(dates) == 2 else dates * 2 |
|
return {"temporalFilter":{"dateField":"search_date","startDate":sd,"endDate":ed}} |
|
|
|
|
|
def build_command_line_arguments(): |
|
description = ('Search and download data (skip those already downloaded)') |
|
parser = ArgumentParser(description=description, add_help=False) |
|
parser.add_argument('--help', action='help', help='show this help message and exit') |
|
parser.add_argument('-d', '--directory', type=str, dest='directory', required=True, metavar='PATH', |
|
help='Relative path to download all data') |
|
parser.add_argument('-u', '--username', type=str, dest='username', default=None, metavar='STR', |
|
help='ERS Username (with full M2M download access)') |
|
parser.add_argument('-t', '--threads', type=int, dest='threads', default=40, metavar='INT', |
|
help='Number of parallel download threads [Default: 40]') |
|
parser.add_argument('-b', '--batch', type=int, dest='batch', default=1000, metavar='INT', |
|
help='Batch size iteration of URLs to receive [Default: 1000]') |
|
parser.add_argument('-m', '--max', type=int, dest='N', default=50000, metavar='INT', |
|
help='Maximum number of search results to return [Default: 50000]') |
|
parser.add_argument('--dataset', type=str, dest='dataset', default='ARD_TILE', metavar='STR', |
|
help='EE Catalog dataset [Default: ARD_TILE]') |
|
parser.add_argument('--products', type=str, dest='products', default='STANDARD', metavar='STR', |
|
help='Comma-delimited products to download [Default: STANDARD]') |
|
parser.add_argument('--temporal', type=str, dest='temporal', default=None, metavar='STR', |
|
help='Search Date Acquired (YYYY-MM-DD or YYYY-MM-DD,YYYY-MM-DD)') |
|
parser.add_argument('--fields', type=json.loads, dest='fields', default=None, metavar='JSON', |
|
help='Filter results based on dataset-specific metadata fields') |
|
args = parser.parse_args() |
|
return args |
|
|
|
|
|
def download_url(x): |
|
fileurl, directory = x[0], x[1] |
|
head = requests.head(fileurl, timeout=300) |
|
filename = head.headers['Content-Disposition'].split('filename=')[-1] |
|
local_fname = os.path.join(directory, filename) |
|
if os.path.exists(local_fname): |
|
message('Already exists: %s \n' % local_fname) |
|
return |
|
|
|
file_size = None |
|
if 'Content-Length' in head.headers: |
|
file_size = int(head.headers['Content-Length']) |
|
bytes_recv = 0 |
|
if os.path.exists(local_fname + '.part'): |
|
bytes_recv = os.path.getsize(local_fname + '.part') |
|
|
|
message("Downloading %s ... \n" % local_fname) |
|
resume_header = {'Range': 'bytes=%d-' % bytes_recv} |
|
sock = requests.get(fileurl, headers=resume_header, timeout=300, |
|
stream=True, verify=False, allow_redirects=True) |
|
|
|
start = time.time() |
|
f = open(local_fname + '.part', 'ab') |
|
bytes_in_mb = 1024*1024 |
|
for block in sock.iter_content(chunk_size=bytes_in_mb): |
|
if block: |
|
f.write(block) |
|
bytes_recv += len(block) |
|
f.close() |
|
ns = time.time() - start |
|
mb = bytes_recv/float(bytes_in_mb) |
|
message("%s (%3.2f (MB) in %3.2f (s), or %3.2f (MB/s)) \n" % (filename, mb, ns, mb/ns)) |
|
|
|
if bytes_recv >= file_size: |
|
os.rename(local_fname + '.part', local_fname) |
|
|
|
|
|
def download_url_wrapper(x): |
|
try: |
|
download_url(x) |
|
except Exception as e: |
|
message('\n\n *** Failed download %s: %s \n' % (x, str(e))) |
|
|
|
|
|
def chunkify(iterable, n=1): |
|
l = len(iterable) |
|
for ndx in range(0, l, n): |
|
yield iterable[ndx:min(ndx + n, l)] |
|
|
|
|
|
def read_credentials(config_path, section='default'): |
|
config = ConfigParser.SafeConfigParser() |
|
config.read(config_path) |
|
cfg_info = dict() |
|
for opt in config.options(section): |
|
cfg_info[opt] = config.get(section, opt) |
|
return cfg_info |
|
|
|
|
|
def credentials(username=None): |
|
if username is None: |
|
cfgpath = os.getenv('M2M_DOWNLOAD_CREDENTIALS_FILE') |
|
cfgsection = os.getenv('M2M_DOWNLOAD_CREDENTIALS_PROFILE', 'default') |
|
if cfgpath: |
|
return read_credentials(os.path.expanduser(cfgpath), cfgsection) |
|
else: |
|
return {'username': raw_input('ERS Username: ')} |
|
else: |
|
return {'username': username} |
|
|
|
|
|
|
|
def download_m2m(directory, username=None, products='STANDARD', dataset='ARD_TILE', |
|
N=50000, temporal=None, batch=1000, threads=40, fields=None): |
|
""" |
|
Search for and download Landsat Level-2 products to local directory |
|
|
|
Args: |
|
directory: Relative path to local directory (will be created) |
|
username: ERS Username (with full M2M download access) [Optional] |
|
dataset: EarthExplorer Catalog datasetName [Default: ARD_TILE] |
|
N: Maximum number of search results to return |
|
products: Comma-delmited list of download products [Default: STANDARD] |
|
temporal: Search Date image acquired [ Format: %Y-%m-%d or %Y-%m-%d,%Y-%m-%d ] |
|
batch: How many URLs to request before working on downloads |
|
threads: Number of download threads to launch in parallel |
|
fields: JSON dataset-specific metadata fields (see #additionalCriteria) |
|
|
|
""" |
|
api_key = EarthExplorer.login(**credentials(username)) |
|
|
|
datasets = EarthExplorer.datasets(apiKey=api_key, datasetName=dataset, publicOnly=False) |
|
matches = [m['datasetName'] for m in datasets] |
|
if len(matches) > 1 and not any([m == dataset for m in matches]): |
|
message(['Multiple dataset matches found, please select only 1: '] |
|
+ ['* [%s]: %s' % (m['datasetName'], m['datasetFullName']) for m in datasets], stop=True) |
|
|
|
search = dict(apiKey=api_key, datasetName=dataset, maxResults=N) |
|
if fields: |
|
search.update(EarthExplorer.additionalCriteriaValues(api_key, dataset, fields)) |
|
if temporal: |
|
search.update(EarthExplorer.temporalCriteria(temporal=temporal)) |
|
|
|
results = EarthExplorer.search(**search) |
|
n_results = results['totalHits'] |
|
product_ids = results['results'] |
|
|
|
message('Total search results: %d \n' % n_results) |
|
if len(product_ids) < 1: |
|
return |
|
|
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
|
|
product_selection_verified = False |
|
for pids in chunkify(product_ids, batch): |
|
pool = Pool(threads) |
|
entities = [p['entityId'] for p in pids] |
|
if not product_selection_verified: |
|
product_avail = EarthExplorer.downloadoptions(apiKey=api_key, datasetName=dataset, entityIds=entities) |
|
product_avail = set(p.get('productcode') or p.get('downloadCode') for x in product_avail for p in x['downloadOptions']) |
|
valid_prods = list(set(products.split(',')) & product_avail) |
|
if len(valid_prods) < 1: |
|
message(['"%s" products not available. Choose from:' % products] |
|
+ ['* %s' % m for m in list(product_avail)], stop=True) |
|
else: |
|
product_selection_verified = True |
|
results = EarthExplorer.download(apiKey=api_key, datasetName=dataset, |
|
products=products.split(','), entityIds=entities) |
|
urls = [(r['url'], directory) for r in results] |
|
urls = [u for u in urls if all(u)] |
|
if urls: |
|
pool.map_async(download_url_wrapper, urls).get(6000) |
|
pool.close() |
|
pool.join() |
|
|
|
if __name__ == '__main__': |
|
download_m2m(**vars(build_command_line_arguments())) |
Do you mind sharing your solution?