Created
June 16, 2017 16:44
-
-
Save giohappy/9586e201dcd46d67b527d4452aeb5251 to your computer and use it in GitHub Desktop.
mapblender.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
from optparse import OptionParser | |
from collections import OrderedDict | |
from osgeo import gdal | |
import numpy as np | |
from PIL import Image | |
from matplotlib.colors import LinearSegmentedColormap | |
RED = "\033[1;31m" | |
GREEN = "\033[0;32m" | |
RESET = "\033[0;0m" | |
def rgb_to_hsv(rgb): | |
rgb = rgb.astype('float') | |
hsv = np.zeros_like(rgb) | |
hsv[..., 3:] = rgb[..., 3:] | |
r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] | |
maxc = np.max(rgb[..., :3], axis=-1) | |
minc = np.min(rgb[..., :3], axis=-1) | |
hsv[..., 2] = maxc | |
mask = maxc != minc | |
hsv[mask, 1] = (maxc - minc)[mask] / maxc[mask] | |
rc = np.zeros_like(r) | |
gc = np.zeros_like(g) | |
bc = np.zeros_like(b) | |
rc[mask] = (maxc - r)[mask] / (maxc - minc)[mask] | |
gc[mask] = (maxc - g)[mask] / (maxc - minc)[mask] | |
bc[mask] = (maxc - b)[mask] / (maxc - minc)[mask] | |
hsv[..., 0] = np.select( | |
[r == maxc, g == maxc], [bc - gc, 2.0 + rc - bc], default=4.0 + gc - rc) | |
hsv[..., 0] = (hsv[..., 0] / 6.0) % 1.0 | |
return hsv | |
def hsv_to_rgb(hsv): | |
rgb = np.empty_like(hsv) | |
rgb[..., 3:] = hsv[..., 3:] | |
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] | |
i = (h * 6.0).astype('uint8') | |
f = (h * 6.0) - i | |
p = v * (1.0 - s) | |
q = v * (1.0 - s * f) | |
t = v * (1.0 - s * (1.0 - f)) | |
i = i % 6 | |
conditions = [s == 0.0, i == 1, i == 2, i == 3, i == 4, i == 5] | |
rgb[..., 0] = np.select(conditions, [v, q, p, p, t, v], default=v) | |
rgb[..., 1] = np.select(conditions, [v, v, v, q, p, p], default=t) | |
rgb[..., 2] = np.select(conditions, [v, p, t, v, v, q], default=p) | |
return rgb.astype('uint8') | |
stretch = False | |
stretch_suf = "_stretch" if stretch else "" | |
invert = False | |
satclasses1 = OrderedDict([(0.05, 1.0), (0.20, 0.75), (0.50, 0.5), (1.0, 0.25)]) | |
satclasses2 = OrderedDict([(0.05, 1.0), (0.20, 0.70), (0.50, 0.40), (1.0, 0.10)]) | |
satclasses3 = OrderedDict([(0.05, 1.0), (0.20, 0.75), (0.50, 0.5), (1.0, 0.35)]) | |
satclasses = satclasses3 | |
def run(options): | |
raster_max = options.raster_max | |
raster_median = options.raster_median | |
output = options.output | |
debug = options.debug | |
def valuescalculator(minval, maxval, stretch=True, rmin=0, rmax=1, invert=False, classes=None): | |
if classes is not None: | |
maxclass = classes.keys()[-1] | |
if stretch: | |
def normalize(x): | |
return ((x - minval) / (maxval - minval)) * (rmax - rmin) + rmin | |
else: | |
def normalize(x): | |
return ((rmax - rmin) * x) + rmin | |
if invert: | |
def classify(out): | |
return (rmax - out) + rmin | |
elif classes: | |
def classify(out): | |
try: | |
if out < classes.keys()[-1]: | |
return next(classes.values()[x[0]] for x in enumerate(classes.keys()) if x[1] > out) | |
else: | |
return classes.values()[-1] | |
except StopIteration, e: | |
return 1.0 | |
else: | |
def classify(out): | |
return out | |
def fnc(x): | |
x = x if x > 0 else 0.0 | |
out = normalize(x) | |
return classify(out) | |
return fnc | |
ds_max = gdal.Open(raster_max) | |
maxval_arr = np.array(ds_max.GetRasterBand(1).ReadAsArray()) | |
geotransform = ds_max.GetGeoTransform() | |
proj = ds_max.GetProjection() | |
del ds_max | |
maxval_arr_nonan = np.nan_to_num(maxval_arr) | |
maxval_arr_nonan[maxval_arr_nonan < 0] = 0 | |
ds_50p = gdal.Open(raster_median) | |
medianval_arr = np.array(ds_50p.GetRasterBand(1).ReadAsArray()) | |
del ds_50p | |
medianval_arr_nonan = np.nan_to_num(medianval_arr) | |
medianval_arr_nonan[medianval_arr_nonan < 0] = 0 | |
modulation_array = maxval_arr_nonan - medianval_arr_nonan | |
modulation_array = np.ma.array(modulation_array, mask=np.isnan(modulation_array)) | |
modulation_minval = np.nanmin(modulation_array) | |
modulation_maxval = np.nanmax(modulation_array) | |
max_arr_masked = np.ma.array(maxval_arr, mask=np.isnan(maxval_arr)) | |
max_maxval = np.nanmax(max_arr_masked) | |
max_minval = np.nanmin(max_arr_masked) | |
normmaxfnc = np.vectorize(valuescalculator(max_minval, max_maxval)) | |
max_arr_normalized = normmaxfnc(max_arr_masked) | |
cdict = {'red': ((0.0, 1.0, 1.0), | |
(1.0, 1.0, 1.0)), | |
'green': ((0.0, 1.0, 1.0), | |
(1.0, 0.0, 0.0)), | |
'blue': ((0.0, 0.0, 0.0), | |
(1.0, 0.0, 0.0)) | |
} | |
cmap = LinearSegmentedColormap('YellowRed', cdict) | |
cmap.set_bad('black') | |
max_img = cmap(max_arr_normalized, bytes=True) | |
if debug: | |
Image.fromarray(max_img, 'RGBA').save(os.path.join("./max.png")) | |
max_hsv = rgb_to_hsv(max_img) | |
rgbshape = (maxval_arr.shape[0], maxval_arr.shape[1], 3) | |
modulation_hsv = np.zeros(rgbshape, dtype="float32") | |
modulation_hsv[..., 0] = 180 / 360. | |
nomrfnc = np.vectorize(valuescalculator(modulation_minval, modulation_maxval, stretch, classes=satclasses)) | |
valuesarray = nomrfnc(modulation_array) | |
modulation_hsv[..., 1] = valuesarray | |
modulation_hsv[..., 2] = 255. | |
max_hsv[..., 1] = valuesarray | |
modulated_rgb = hsv_to_rgb(max_hsv) | |
nx = maxval_arr.shape[0] | |
ny = maxval_arr.shape[1] | |
if os.path.exists(output): | |
os.remove(output) | |
try: | |
dst_ds = gdal.GetDriverByName('GTiff').Create(output, ny, nx, 3, gdal.GDT_Byte) | |
dst_ds.SetGeoTransform(geotransform) | |
dst_ds.SetProjection(proj) | |
dst_ds.GetRasterBand(1).WriteArray(modulated_rgb[..., 0]) | |
dst_ds.GetRasterBand(2).WriteArray(modulated_rgb[..., 1]) | |
dst_ds.GetRasterBand(3).WriteArray(modulated_rgb[..., 2]) | |
dst_ds.FlushCache() | |
except: | |
sys.stdout.write(RED) | |
print "ERROR: Error writing output image {}".format(output) | |
if dst_ds: | |
dst_ds = None | |
exit(1) | |
if debug: | |
modulated_img = Image.fromarray(modulated_rgb) | |
modulated_img.save(output+'.png') | |
sys.stdout.write(GREEN) | |
print "Process completed. Output image written to {}".format(output) | |
fileoptions = ['raster_max','raster_median'] | |
parser = OptionParser() | |
parser.add_option("-M", "--max", dest="raster_max", | |
help="path to max raster FILE", metavar="FILE") | |
parser.add_option("-m", "--median", dest="raster_median", | |
help="path to median raster FILE", metavar="FILE") | |
parser.add_option("-o", "--output", dest="output", | |
help="path to output raster FILE (Geotiff)", metavar="FILE", default="./output.tiff") | |
parser.add_option("-d", "--debug", dest="debug", action="store_true", | |
help="write PNG images for visual inspection", metavar="FILE", default=False) | |
if __name__ == '__main__': | |
(options, args) = parser.parse_args() | |
error = False | |
errors = [] | |
if not options.raster_max: | |
errors.append("MAX raster file required (-M)") | |
error = True | |
if not options.raster_median: | |
errors.append("MEDIAN raster file required (-m)") | |
error = True | |
if not error: | |
options_dict = vars(options) | |
for fileoption in fileoptions: | |
if not os.path.exists(options_dict[fileoption]): | |
errors.append("{} file doesn't exist".format(options_dict[fileoption])) | |
error = True | |
if error: | |
sys.stdout.write(RED) | |
print "ERROR:" | |
for e in errors: | |
print e | |
sys.stdout.write(RESET) | |
print parser.print_help() | |
exit(1) | |
run(options) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment