Skip to content

Instantly share code, notes, and snippets.

@bbuechler
Created June 30, 2022 00:30
Show Gist options
  • Save bbuechler/4eaad34db3dd342360c2ec27e251ec5a to your computer and use it in GitHub Desktop.
Save bbuechler/4eaad34db3dd342360c2ec27e251ec5a to your computer and use it in GitHub Desktop.
Read manifest.safe from any scene of of any SLC in CMR
import os
import base64
import logging
import json
from urllib import request
from urllib.request import Request, urlopen
from urllib.parse import urlencode
from urllib.error import HTTPError
from http import cookiejar
from remotezip import RemoteZip
CMR_BASE_URL = 'https://cmr.earthdata.nasa.gov/search/granules.umm_json'
def get_url_from_cmr(scene):
""" Grab the GET DATA url from CMR """
# CMR Search params
params = [
('collection_concept_id',['C1214470488-ASF','C1327985661-ASF']),
('producer_granule_id',scene)
]
query_string = urlencode(params, True).encode( "ascii" )
with urlopen( CMR_BASE_URL, query_string ) as response:
try:
serach_result = response.read().decode('utf-8')
json_result = json.loads(serach_result)
urls = json_result["items"][0]["umm"]["RelatedUrls"]
get_data_url = next((item["URL"] for item in urls if item["Type"] == "GET DATA"))
except KeyError as err:
print("Could not find URL in JSON %s: %s", json_result, err)
return generate_url_from_scene(scene)
except json.decoder.JSONDecodeError as err:
print("Could not convert %s to JSON: %s", serach_result, err)
return generate_url_from_scene(scene)
print("Retrieved URL from CMR: %s", get_data_url)
return get_data_url
return False
def get_creds():
""" Pull URS creds out of the atmosphere """
username = os.getenv('EDL_USER')
password = os.getenv('EDL_PASS')
u_p = bytes(f"{username}:{password}", 'utf-8')
return base64.b64encode(u_p).decode('utf-8')
def add_url_host(url, new_url):
""" for same-host redirects """
host = url.split('/')[2]
return f"https://{host}{new_url}"
class NoRedirect(request.HTTPRedirectHandler):
""" Special redirct handler to follow Request redirects """
def redirect_request(self, req, fp, code, msg, headers, newurl): #pylint: disable=too-many-arguments
return None
# Keep 'em cookies!
cj = cookiejar.CookieJar()
opener = request.build_opener(request.HTTPCookieProcessor(cj), NoRedirect)
request.install_opener(opener)
logging.basicConfig()
def get_dl_url (url, traceback = None):
""" get Download URL by recursively following redirects and providing auth """
traceback = traceback or []
# Watch out for redirect loops
if len(traceback) > 6:
print(f"TOO MANY REDIRECTS: {traceback}")
return traceback, False
headers = {}
# Only send Auth Creds to EDL
if "urs.earthdata.nasa.gov/oauth/authorize" in url:
print(".... + Adding basic auth headers")
headers['Authorization'] = f"Basic {get_creds()}"
req = Request( url, headers=headers)
try:
resp = urlopen(req)
except HTTPError as err:
if err.code == 401 and err.getheader('Location'):
# Password failed or other unknown auth problem...
print(f"Redirecting for auth: {err.getheader('Location')}")
traceback.append( { 'url': url, 'code': err.code } )
return traceback, False
if err.code >= 300 and err.code <= 400:
# Redirect response....
new_url = err.getheader('Location')
# Check for self-redirects
if 'https://' not in new_url:
new_url = add_url_host(url, new_url)
# Recursively Follow redirect
print(f" .... Redirecting w/ {err.code} to {new_url}")
traceback.append( { 'url': url, 'code': err.code } )
return get_dl_url( new_url, traceback )
# Some other failure... 404?
traceback.append( { 'url': url, 'code': err.code } )
print(f"Hit HTTPError.... {err}")
return traceback, False
except Exception as err: # pylint: disable=broad-except
# DNS problem?
print(f"Could not hit {url}: {err}")
return traceback, False
# If we get here, we should have the final download URL
return traceback, url
scene_url = get_url_from_cmr('S1A_IW_SLC__1SDV_20191014T123412_20191014T123439_029455_0359CA_63D6')
traceback, dl_url = get_dl_url(scene_url)
if not dl_url:
print(f"Could not download {scene_url}")
with RemoteZip(dl_url) as s1_zip:
for zipped_file in s1_zip.infolist():
if "manifest.safe" in zipped_file.filename:
print ( s1_zip.open( zipped_file.filename ).read())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment