Skip to content

Instantly share code, notes, and snippets.

@francbartoli
Created September 26, 2017 16:39
Show Gist options
  • Select an option

  • Save francbartoli/450665cd89ae16b004ca8f141083b4a2 to your computer and use it in GitHub Desktop.

Select an option

Save francbartoli/450665cd89ae16b004ca8f141083b4a2 to your computer and use it in GitHub Desktop.
create netcdf utility
import datetime
import json
import logging
import os
import shutil
from urlparse import urljoin
from zipfile import ZIP_DEFLATED, ZipFile
import requests
from osgeo import gdal, osr
from geonode.geoserver.helpers import ogc_server_settings
from geonode.geoserver.management.commands.updatelayers import \
Command as UpdateLayersCommand
from geonode.layers.models import Layer, Style
from geoserver.catalog import Catalog
from rkh import settings
RKH_NETCDF_TMPDIR = getattr(settings, 'RKH_NETCDF_TMPDIR', 'tmp')
RKH_GDAL_FORMAT = getattr(settings, 'RKH_GDAL_FORMAT', 'NETCDF')
DEFAULT_WORKSPACE = getattr(settings, 'DEFAULT_WORKSPACE', 'geonode')
RKH_GS_DATA_DIR = getattr(settings, 'RKH_GS_DATA_DIR')
tmp_dirname = RKH_NETCDF_TMPDIR
gdal_format = RKH_GDAL_FORMAT
gdal_nc_prefix = gdal_format + ':"'
SRSWGS84 = 'GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID\
["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],\
AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],\
UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],\
AUTHORITY["EPSG","4326"]]'
class NetCDFProcessor(object):
"""Base class to handle RKH netcdf ingestion into GeoNode/GeoServer."""
def __init__(self, netcdf=None, tmp_dirname=None, **kwargs):
super(NetCDFProcessor, self).__init__()
if netcdf:
self.netcdf = netcdf
if tmp_dirname:
self.tmp_dirname = tmp_dirname
self.gdal_nc_prefix = gdal_nc_prefix
self.basedir = os.path.dirname(self.netcdf)
self.destdir = os.path.join(self.basedir, self.tmp_dirname)
logging.debug("Destination temporary directory is \
\n===>%s" % self.destdir
)
if not os.path.exists(self.destdir):
try:
os.makedirs(self.destdir)
except Exception as e:
raise e.message
else:
pass
def translate(self, filename, srsDest):
"""GDAL translate of a netcdf from RKH."""
projwin = []
if not self.netcdf:
logging.error("Input netcdf file %s doesn't exist!" % self.netcdf)
# with open(os.path.join(self.destdir, filename), 'wb') as out_file:
reportfile = open('/tmp/failed.txt', "a")
try:
out_file = os.path.join(self.destdir, filename)
infile = gdal.Open(self.netcdf)
spatial_ref = infile.GetProjectionRef()
numbands = infile.RasterCount
# Check if SRS has been defined as metadata and use that
try:
if 'SRS' in infile.GetMetadata_Dict('GEOLOCATION'):
srsDest = _wkt2epsg(
infile.GetMetadata_Dict('GEOLOCATION')['SRS']
)
else:
srsDest = _wkt2epsg(SRSWGS84)
except Exception as e:
logging.error(e)
pass
if numbands == 0:
logging.error("The dataset has %s bands!", numbands)
reportfile.write(self.netcdf)
reportfile.write("\n")
reportfile.close()
return None
if not spatial_ref:
lon = gdal.Open(self.gdal_nc_prefix + self.netcdf + '":lon')
longs = lon.GetRasterBand(1).ReadAsArray()
lat = gdal.Open(self.gdal_nc_prefix + self.netcdf + '":lat')
lats = lat.GetRasterBand(1).ReadAsArray()
ul = (float(longs[0][0]), float(lats[0][0]))
lr = (
float(longs[len(longs) - 1][len(longs[0]) - 1]),
float(lats[len(longs) - 1][len(longs[0]) - 1])
)
projwin.append(ul[0])
projwin.append(ul[1])
projwin.append(lr[0])
projwin.append(lr[1])
ds = gdal.Translate(out_file,
self.netcdf,
outputSRS=srsDest, # EPSG:4326
outputBounds=projwin,
format=gdal_format)
if os.path.exists(out_file):
outfile = gdal.Open(out_file)
if outfile.GetProjectionRef():
logging.info("Processed NETCDF\n===>%s\nhas now the\
following CRS\n===>%s",
(out_file,
outfile.GetProjectionRef()
)
)
else:
logging.error("Processed NETCDF\n===>%s\nhas failed!!!",
out_file)
ds = None
infile = None
return out_file
else:
return infile
except RuntimeError as e:
raise e.message
def _wkt2epsg(wkt,
epsg='/usr/local/share/proj/epsg',
forceProj4=False):
r"""Transform a WKT string to an EPSG code.
Function from Tom Kradilis answer on StackExchange.
Inherit this functions
https://github.com/ecometrica/gdal2mbtiles/blob/master/gdal2mbtiles/gdal.py
might work better.
Arguments
---------
wkt: WKT definition
epsg: the proj.4 epsg file\
(defaults to '/usr/local/share/proj/epsg')
forceProj4: whether to perform brute force proj4\
epsg file check (last resort)
Returns: EPSG code
"""
code = None
p_in = osr.SpatialReference()
s = p_in.ImportFromWkt(wkt)
if s == 5: # invalid WKT
return None
if p_in.IsLocal() == 1: # this is a local definition
return p_in.ExportToWkt()
if p_in.IsGeographic() == 1: # this is a geographic srs
cstype = 'GEOGCS'
else: # this is a projected srs
cstype = 'PROJCS'
an = p_in.GetAuthorityName(cstype)
ac = p_in.GetAuthorityCode(cstype)
if an is not None and ac is not None: # return the EPSG code
return '%s:%s' % \
(p_in.GetAuthorityName(cstype),
p_in.GetAuthorityCode(cstype))
else:
return code
# no brute force from file, return None
# try brute force approach by grokking
# proj epsg definition file
# p_out = p_in.ExportToProj4()
# if p_out:
# if forceProj4 is True:
# return p_out
# f = open(epsg)
# for line in f:
# if line.find(p_out) != -1:
# m = re.search('<(\\d+)>', line)
# if m:
# code = m.group(1)
# break
# if code: # match
# return 'EPSG:%s' % code
# else: # no match
# return None
# else:
# return None
class NetCDFLoader(object):
"""
NetCDF utility class to create layers in GeoServer.
Base class to handle netcdf retrieval and processing
for import into GeoNode/GeoServer
"""
tmp_dir = RKH_NETCDF_TMPDIR
gs_data_dir = RKH_GS_DATA_DIR
base_url = ogc_server_settings.internal_rest + "/workspaces/"
gs_url = base_url + "{}/coveragestores/{}/file.netcdf"
gs_store_url = base_url + "{}/coveragestores/"
gs_style_url = ogc_server_settings.internal_rest + "/styles/"
def __init__(self, workspace=DEFAULT_WORKSPACE, tmp_dir=None,
**kwargs):
self.workspace = workspace
if tmp_dir:
self.tmp_dir = tmp_dir
def create_netcdf_zip(self, nc_file, lyr_name=None):
"""
Create a zipfile with the dataset for a netcdf raster datastore.
:param lyr_name: name of layer to create
:param nc_file: full path + name of netcdf dataset
:return: full path+name of created zip file
"""
if not lyr_name:
lyr_name = os.path.splitext(os.path.basename(nc_file))[0]
tmp_dir = os.path.join(os.path.dirname(nc_file),
self.tmp_dir,
lyr_name)
print "create_netcdf_zip-tmp_dir===>\n{}".format(tmp_dir)
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
zip_type = 'zip'
zip_archive = os.path.join(tmp_dir, "{}.{}".format(lyr_name, zip_type))
try:
with ZipFile(zip_archive, 'w') as zipFile:
print "create_netcdf_zip-nc_file===>\n{}".format(nc_file)
print "create_netcdf_zip-basename===>\n{}".format(
os.path.basename(nc_file))
zipFile.write(nc_file,
os.path.basename(nc_file),
ZIP_DEFLATED)
# os.remove(nc_file)
print "create_netcdf_zip-zip_archive===>\n{}".format(zip_archive)
return zip_archive
except Exception as e:
shutil.rmtree(tmp_dir)
raise e
def create_netcdf(self, nc_file, lyr_name=None):
"""
Create a netcdf datastore and layer.
:param lyr_name: Name of netcdf layer/store to create
:param nc_file: full path + name of netcdf dataset
:return: coveragestore, layername
"""
if not lyr_name:
lyr_name = os.path.splitext(os.path.basename(nc_file))[0]
print "create_netcdf-nc_file===>\n{}".format(nc_file)
ziploc = self.create_netcdf_zip(nc_file, lyr_name)
print "create_netcdf-ziploc===>\n{}".format(ziploc)
try:
with open(ziploc, 'rb') as zipdata:
_user, _password = ogc_server_settings.credentials
gs_store_url = self.gs_store_url.format(self.workspace)
coverage_store_name = '{}'.format(lyr_name)
querystring = {"configure": "all"}
nc_covstore_json = json.dumps({
'coverageStore': {
'name': coverage_store_name,
'workspace': '{}'.format(self.workspace),
'enabled': True,
'type': 'NetCDF',
'url': 'file:/{}'.format(
os.path.relpath(nc_file, RKH_GS_DATA_DIR))
}
})
# create netcdf store
res = requests.post(url=gs_store_url,
data=nc_covstore_json,
auth=(_user, _password),
headers={
'Content-Type': 'application/json',
'cache-control': "no-cache"
},
params=querystring)
res.raise_for_status()
gs_url = self.gs_url.format(self.workspace, lyr_name)
data = zipdata.read()
# create netcdf layer in the above store
res = requests.put(url=gs_url,
data=data,
auth=(_user, _password),
headers={
'Content-Type': 'application/zip',
'cache-control': "no-cache"
})
res.raise_for_status()
gs_url = gs_url.replace(
'file.netcdf',
'coverages.json')
# get name of netcdf layer
res = requests.get(url=gs_url,
auth=(_user, _password),
headers={
'Content-Type': 'application/json'
})
res.raise_for_status()
layername = json.loads(
res.text
)["coverages"]["coverage"][0].get("name")
return (lyr_name, layername)
finally:
if os.path.exists(ziploc):
shutil.rmtree(os.path.dirname(ziploc))
def update_netcdf(self, coverage_store, lyr_name, new_name):
"""
Update a netcdf layer.
:param coverage_store: Store name for the netcdf layer
:param lyr_name: Name of netcdf layer to update
:return: coverage_store, layername
"""
_user, _password = ogc_server_settings.credentials
gs_url = self.gs_url.format(self.workspace, coverage_store)
# update a created netcdf layer
gs_url = gs_url.replace(
'file.netcdf',
'coverages/{}.json'.format(lyr_name))
nc_coverage_json = json.dumps({
"coverage": {
"enabled": True,
"name": "{}".format(new_name),
"nativeName": "{}".format(new_name),
"title": "{}".format(new_name),
"nativeCRS": "EPSG:4326",
"nativeCoverageName": "{}".format(new_name),
"srs": "EPSG:4326"
}
})
# update a netcdf layer
res = requests.put(url=gs_url,
data=nc_coverage_json,
auth=(_user, _password),
headers={
'Content-Type': 'application/json',
'cache-control': "no-cache"
})
# if res.status_code != 201:
# logger.error('Request status code was: %s' % req.status_code)
# logger.error('Response was: %s' % req.text)
# raise GeoNodeException("Layer could not be created in GeoServer")
res.raise_for_status()
gs_url = gs_url.replace(
'coverages/{}.json'.format(lyr_name),
'coverages/{}.json'.format(new_name))
# get name of netcdf layer
res = requests.get(url=gs_url,
auth=(_user, _password),
headers={
'Content-Type': 'application/json'
})
res.raise_for_status()
layername = json.loads(
res.text
)["coverage"]["name"]
return (coverage_store, layername)
def update_geonode(self, layer_name, title="", description="",
category=None, bounds=None, store=None,
extra_keywords=None):
"""
Update a layer and it's title in GeoNode.
:param layer_name: Name of the layer
:param title: Title for layer
:param description: Description for layer
:param category: Category for the layer
:param bounds: Bounds for layer
:param store: Store for layer
:param extra_keywords: Additional keywords for layer
"""
# Update the layer in GeoNode
ulc = UpdateLayersCommand()
ulc.handle(verbosity=1, filter=layer_name, store=store,
workspace=self.workspace)
# update geoserver layer
_user, _password = ogc_server_settings.credentials
url = ogc_server_settings.internal_rest
gscatalog = Catalog(url, _user, _password)
lyr = Layer.objects.get(typename='geonode:{}'.format(layer_name))
res = gscatalog.get_resource(lyr.name)
if title:
lyr.title = title
lyr.abstract = description
if category:
lyr.category = category
lyr.save()
if bounds:
res.native_bbox = bounds
gscatalog.save(res)
if extra_keywords:
assert isinstance(extra_keywords, list)
# now = datetime.datetime.now().isoformat()
# extra_keywords.append('datetime:{}'.format(now))
# Append extra keywords to the default ones
keywords = _add_keywords(res.keywords, extra_keywords)
res.keywords = keywords
gscatalog.save(res)
for keyword in keywords:
lyr.keywords.add(keyword)
lyr.save()
def set_default_style(self, layer_name, sld_name, sld_content,
create=True):
"""
Create a style and assign it as default to a layer.
:param layer_name: the layer to assign the style to
:param sld_name: the name to give the style
:param sld_content: the actual XML content for the style
:param create: create the style if true
:return: None
"""
gs_url = self.gs_style_url
_user, _password = ogc_server_settings.credentials
if create:
# Create the style
s = "<style><name>{n}</name><filename>{n}.sld</filename></style>"
data = s.format(n=sld_name)
res = requests.post(url=gs_url,
data=data,
auth=(_user, _password),
headers={'Content-Type': 'text/xml'})
res.raise_for_status()
# Populate the style
data = sld_content
url = urljoin(gs_url, sld_name)
logging.debug(url)
res = requests.put(url=url,
data=data,
auth=(_user, _password),
headers={
'Content-Type': 'application/vnd.ogc.sld+xml'
})
res.raise_for_status()
# Assign to the layer
# @TODO check if the style exists in geoserver
layer_typename = "{}%3A{}".format(DEFAULT_WORKSPACE, layer_name)
layer_style_json = json.dumps({
'layer': {
'defaultStyle': {
'name': '{}'.format(sld_name)
},
'styles': {
'style': [
{
'name': '{}'.format(sld_name)
}
]
}
}
})
url = urljoin(gs_url.replace("styles", "layers"), layer_typename)
logging.debug(url)
res = requests.put(
url=url,
data=layer_style_json,
auth=(_user, _password),
headers={
'Content-Type': 'application/json'
})
res.raise_for_status()
if res.status_code == requests.codes.ok:
# Update geonode through updatelayers
ulc = UpdateLayersCommand()
ulc.handle(verbosity=1,
filter=layer_name,
store=None,
workspace=self.workspace)
# Assign style as default style of the layer
try:
lyr = Layer.objects.get(name=layer_name)
style = Style.objects.get(name=sld_name)
if not (lyr.default_style == style):
lyr.default_style = style
lyr.save()
except Exception as e:
raise e
def _add_keywords(keyword_list, extra_keywords):
"""Adds extra_keywords list to the keyword_list."""
# check if datetime and category already exist in the
# keyword list
filtered_keywords = [k for k in keyword_list if not
(k.startswith('category:') or
k.startswith('datetime:') or
k.startswith('layer_info'))]
return extra_keywords + filtered_keywords
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment