Skip to content

Instantly share code, notes, and snippets.

@robintw
Created May 9, 2016 20:11
Show Gist options
  • Save robintw/68f7ceec29fc35849ccac53c9325bd2e to your computer and use it in GitHub Desktop.
Save robintw/68f7ceec29fc35849ccac53c9325bd2e to your computer and use it in GitHub Desktop.
import sys
sys.path.append(r"C:\Dropbox\Dropbox\_PhD\_Code\PythonLibs")
# sys.path.append(r"/Users/robin/Dropbox/_PhD/_Code/PythonLibs")
from LandsatUtils import parse_metadata
from Py6S import *
import numpy as np
import dateutil
from VanHOzone import get_ozone_conc
from scipy.interpolate import interp1d
from functools import wraps
from itertools import product
import pandas as pd
from scipy.interpolate import LinearNDInterpolator
import cPickle
import os
import logging
import gdal
import dill
class AtmosCorrLUT:
cache = {}
def __init__(self, metadata_fname, lut_directory, aero_type, PWC, scale=1):
self.lut_directory = lut_directory
self.metadata_filename = metadata_fname
self.metadata = parse_metadata(metadata_fname)
self.scale = scale
self.aero_type = aero_type
self.PWC = PWC
def _process_band(self, s, radiance, wavelength):
s.wavelength = Wavelength(wavelength)
s.atmos_corr = AtmosCorr.AtmosCorrLambertianFromRadiance(radiance)
s.run()
return s.outputs.atmos_corrected_reflectance_lambertian
def save_lut(self, filename):
with open(filename, 'wb') as f:
cPickle.dump(self.luts, f)
def create_lut(self):
"""Tries to load a LUT from the specified LUT_directory
and if it can't load one, then creates it, and saves it"""
filename = "%s_%s_%.2f.lut" % (self.metadata['METADATA_FILE_INFO']['LANDSAT_SCENE_ID'].replace('"', ''), self.aero_type, self.PWC)
path = os.path.join(self.lut_directory, filename)
if os.path.exists(path):
logging.info('LUT already exists, no need to re-create')
return path
else:
logging.info('LUT not found in LUT Directory - creating and saving')
self.make_lut()
self.save_lut(path)
return path
def make_lut(self):
s = SixS()
str_timestamp = self.metadata['PRODUCT_METADATA']['DATE_ACQUIRED'] + " " + self.metadata['PRODUCT_METADATA']['SCENE_CENTER_TIME']
timestamp = dateutil.parser.parse(str_timestamp)
logging.debug(timestamp)
lat = float(self.metadata['PRODUCT_METADATA']['CORNER_UL_LAT_PRODUCT'])
lon = float(self.metadata['PRODUCT_METADATA']['CORNER_UL_LON_PRODUCT'])
logging.debug(lat, lon)
ozone = get_ozone_conc([lat], [lon], timestamp)
# Assume no water content - won't affect visible bands anyway
logging.debug(ozone)
s.atmos_profile = AtmosProfile.UserWaterAndOzone(self.PWC, ozone/1000)
s.geometry = Geometry.Landsat_TM()
s.geometry.latitude = lat
s.geometry.longitude = lon
# Set day, month and gmt_decimal_hour here from timestamp
s.geometry.day = timestamp.day
s.geometry.month = timestamp.month
s.geometry.gmt_decimal_hour = timestamp.hour + (timestamp.minute / 60.0)
s.altitudes.set_sensor_satellite_level()
# ASSUMPTION: Setting altitude to sea level, even though that isn't necessarily correct everywhere
s.altitudes.set_target_sea_level()
aerosol_type = eval('AeroProfile.%s' % self.aero_type)
s.aero_profile = AeroProfile.PredefinedType(aerosol_type)
#return s
#import pdb; pdb.set_trace()
aots = [0.001, 0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8,
0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 3.0]
luts = {}
luts['1'] = self.make_lut_for_wavelength(PredefinedWavelengths.LANDSAT_ETM_B1, s, aots)
luts['2'] = self.make_lut_for_wavelength(PredefinedWavelengths.LANDSAT_ETM_B2, s, aots)
luts['3'] = self.make_lut_for_wavelength(PredefinedWavelengths.LANDSAT_ETM_B3, s, aots)
luts['4'] = self.make_lut_for_wavelength(PredefinedWavelengths.LANDSAT_ETM_B4, s, aots)
luts['5'] = self.make_lut_for_wavelength(PredefinedWavelengths.LANDSAT_ETM_B5, s, aots)
self.luts = pd.Panel(luts)
logging.info("Successfully created LUT")
def make_lut_for_wavelength(self, wv, s, aots):
df = pd.DataFrame({'aot': aots})
df['xa'] = np.nan
df['xb'] = np.nan
df['xc'] = np.nan
s.wavelength = Wavelength(wv)
for i in df.index:
row = df.ix[i]
s.aot550 = row['aot']
s.atmos_corr = AtmosCorr.AtmosCorrLambertianFromRadiance(100)
s.run()
row['xa'] = s.outputs.coef_xa
row['xb'] = s.outputs.coef_xb
row['xc'] = s.outputs.coef_xc
print row['aot'], row['xa'], row['xb'], row['xc']
return df
# f = interp1d(np.array(df.aot), np.array(df)[:, -2:].T)
# return f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment