Created
September 26, 2017 16:39
-
-
Save francbartoli/450665cd89ae16b004ca8f141083b4a2 to your computer and use it in GitHub Desktop.
create netcdf utility
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime | |
| import json | |
| import logging | |
| import os | |
| import shutil | |
| from urlparse import urljoin | |
| from zipfile import ZIP_DEFLATED, ZipFile | |
| import requests | |
| from osgeo import gdal, osr | |
| from geonode.geoserver.helpers import ogc_server_settings | |
| from geonode.geoserver.management.commands.updatelayers import \ | |
| Command as UpdateLayersCommand | |
| from geonode.layers.models import Layer, Style | |
| from geoserver.catalog import Catalog | |
| from rkh import settings | |
| RKH_NETCDF_TMPDIR = getattr(settings, 'RKH_NETCDF_TMPDIR', 'tmp') | |
| RKH_GDAL_FORMAT = getattr(settings, 'RKH_GDAL_FORMAT', 'NETCDF') | |
| DEFAULT_WORKSPACE = getattr(settings, 'DEFAULT_WORKSPACE', 'geonode') | |
| RKH_GS_DATA_DIR = getattr(settings, 'RKH_GS_DATA_DIR') | |
| tmp_dirname = RKH_NETCDF_TMPDIR | |
| gdal_format = RKH_GDAL_FORMAT | |
| gdal_nc_prefix = gdal_format + ':"' | |
| SRSWGS84 = 'GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID\ | |
| ["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],\ | |
| AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],\ | |
| UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],\ | |
| AUTHORITY["EPSG","4326"]]' | |
| class NetCDFProcessor(object): | |
| """Base class to handle RKH netcdf ingestion into GeoNode/GeoServer.""" | |
| def __init__(self, netcdf=None, tmp_dirname=None, **kwargs): | |
| super(NetCDFProcessor, self).__init__() | |
| if netcdf: | |
| self.netcdf = netcdf | |
| if tmp_dirname: | |
| self.tmp_dirname = tmp_dirname | |
| self.gdal_nc_prefix = gdal_nc_prefix | |
| self.basedir = os.path.dirname(self.netcdf) | |
| self.destdir = os.path.join(self.basedir, self.tmp_dirname) | |
| logging.debug("Destination temporary directory is \ | |
| \n===>%s" % self.destdir | |
| ) | |
| if not os.path.exists(self.destdir): | |
| try: | |
| os.makedirs(self.destdir) | |
| except Exception as e: | |
| raise e.message | |
| else: | |
| pass | |
| def translate(self, filename, srsDest): | |
| """GDAL translate of a netcdf from RKH.""" | |
| projwin = [] | |
| if not self.netcdf: | |
| logging.error("Input netcdf file %s doesn't exist!" % self.netcdf) | |
| # with open(os.path.join(self.destdir, filename), 'wb') as out_file: | |
| reportfile = open('/tmp/failed.txt', "a") | |
| try: | |
| out_file = os.path.join(self.destdir, filename) | |
| infile = gdal.Open(self.netcdf) | |
| spatial_ref = infile.GetProjectionRef() | |
| numbands = infile.RasterCount | |
| # Check if SRS has been defined as metadata and use that | |
| try: | |
| if 'SRS' in infile.GetMetadata_Dict('GEOLOCATION'): | |
| srsDest = _wkt2epsg( | |
| infile.GetMetadata_Dict('GEOLOCATION')['SRS'] | |
| ) | |
| else: | |
| srsDest = _wkt2epsg(SRSWGS84) | |
| except Exception as e: | |
| logging.error(e) | |
| pass | |
| if numbands == 0: | |
| logging.error("The dataset has %s bands!", numbands) | |
| reportfile.write(self.netcdf) | |
| reportfile.write("\n") | |
| reportfile.close() | |
| return None | |
| if not spatial_ref: | |
| lon = gdal.Open(self.gdal_nc_prefix + self.netcdf + '":lon') | |
| longs = lon.GetRasterBand(1).ReadAsArray() | |
| lat = gdal.Open(self.gdal_nc_prefix + self.netcdf + '":lat') | |
| lats = lat.GetRasterBand(1).ReadAsArray() | |
| ul = (float(longs[0][0]), float(lats[0][0])) | |
| lr = ( | |
| float(longs[len(longs) - 1][len(longs[0]) - 1]), | |
| float(lats[len(longs) - 1][len(longs[0]) - 1]) | |
| ) | |
| projwin.append(ul[0]) | |
| projwin.append(ul[1]) | |
| projwin.append(lr[0]) | |
| projwin.append(lr[1]) | |
| ds = gdal.Translate(out_file, | |
| self.netcdf, | |
| outputSRS=srsDest, # EPSG:4326 | |
| outputBounds=projwin, | |
| format=gdal_format) | |
| if os.path.exists(out_file): | |
| outfile = gdal.Open(out_file) | |
| if outfile.GetProjectionRef(): | |
| logging.info("Processed NETCDF\n===>%s\nhas now the\ | |
| following CRS\n===>%s", | |
| (out_file, | |
| outfile.GetProjectionRef() | |
| ) | |
| ) | |
| else: | |
| logging.error("Processed NETCDF\n===>%s\nhas failed!!!", | |
| out_file) | |
| ds = None | |
| infile = None | |
| return out_file | |
| else: | |
| return infile | |
| except RuntimeError as e: | |
| raise e.message | |
| def _wkt2epsg(wkt, | |
| epsg='/usr/local/share/proj/epsg', | |
| forceProj4=False): | |
| r"""Transform a WKT string to an EPSG code. | |
| Function from Tom Kradilis answer on StackExchange. | |
| Inherit this functions | |
| https://github.com/ecometrica/gdal2mbtiles/blob/master/gdal2mbtiles/gdal.py | |
| might work better. | |
| Arguments | |
| --------- | |
| wkt: WKT definition | |
| epsg: the proj.4 epsg file\ | |
| (defaults to '/usr/local/share/proj/epsg') | |
| forceProj4: whether to perform brute force proj4\ | |
| epsg file check (last resort) | |
| Returns: EPSG code | |
| """ | |
| code = None | |
| p_in = osr.SpatialReference() | |
| s = p_in.ImportFromWkt(wkt) | |
| if s == 5: # invalid WKT | |
| return None | |
| if p_in.IsLocal() == 1: # this is a local definition | |
| return p_in.ExportToWkt() | |
| if p_in.IsGeographic() == 1: # this is a geographic srs | |
| cstype = 'GEOGCS' | |
| else: # this is a projected srs | |
| cstype = 'PROJCS' | |
| an = p_in.GetAuthorityName(cstype) | |
| ac = p_in.GetAuthorityCode(cstype) | |
| if an is not None and ac is not None: # return the EPSG code | |
| return '%s:%s' % \ | |
| (p_in.GetAuthorityName(cstype), | |
| p_in.GetAuthorityCode(cstype)) | |
| else: | |
| return code | |
| # no brute force from file, return None | |
| # try brute force approach by grokking | |
| # proj epsg definition file | |
| # p_out = p_in.ExportToProj4() | |
| # if p_out: | |
| # if forceProj4 is True: | |
| # return p_out | |
| # f = open(epsg) | |
| # for line in f: | |
| # if line.find(p_out) != -1: | |
| # m = re.search('<(\\d+)>', line) | |
| # if m: | |
| # code = m.group(1) | |
| # break | |
| # if code: # match | |
| # return 'EPSG:%s' % code | |
| # else: # no match | |
| # return None | |
| # else: | |
| # return None | |
| class NetCDFLoader(object): | |
| """ | |
| NetCDF utility class to create layers in GeoServer. | |
| Base class to handle netcdf retrieval and processing | |
| for import into GeoNode/GeoServer | |
| """ | |
| tmp_dir = RKH_NETCDF_TMPDIR | |
| gs_data_dir = RKH_GS_DATA_DIR | |
| base_url = ogc_server_settings.internal_rest + "/workspaces/" | |
| gs_url = base_url + "{}/coveragestores/{}/file.netcdf" | |
| gs_store_url = base_url + "{}/coveragestores/" | |
| gs_style_url = ogc_server_settings.internal_rest + "/styles/" | |
| def __init__(self, workspace=DEFAULT_WORKSPACE, tmp_dir=None, | |
| **kwargs): | |
| self.workspace = workspace | |
| if tmp_dir: | |
| self.tmp_dir = tmp_dir | |
| def create_netcdf_zip(self, nc_file, lyr_name=None): | |
| """ | |
| Create a zipfile with the dataset for a netcdf raster datastore. | |
| :param lyr_name: name of layer to create | |
| :param nc_file: full path + name of netcdf dataset | |
| :return: full path+name of created zip file | |
| """ | |
| if not lyr_name: | |
| lyr_name = os.path.splitext(os.path.basename(nc_file))[0] | |
| tmp_dir = os.path.join(os.path.dirname(nc_file), | |
| self.tmp_dir, | |
| lyr_name) | |
| print "create_netcdf_zip-tmp_dir===>\n{}".format(tmp_dir) | |
| if not os.path.exists(tmp_dir): | |
| os.makedirs(tmp_dir) | |
| zip_type = 'zip' | |
| zip_archive = os.path.join(tmp_dir, "{}.{}".format(lyr_name, zip_type)) | |
| try: | |
| with ZipFile(zip_archive, 'w') as zipFile: | |
| print "create_netcdf_zip-nc_file===>\n{}".format(nc_file) | |
| print "create_netcdf_zip-basename===>\n{}".format( | |
| os.path.basename(nc_file)) | |
| zipFile.write(nc_file, | |
| os.path.basename(nc_file), | |
| ZIP_DEFLATED) | |
| # os.remove(nc_file) | |
| print "create_netcdf_zip-zip_archive===>\n{}".format(zip_archive) | |
| return zip_archive | |
| except Exception as e: | |
| shutil.rmtree(tmp_dir) | |
| raise e | |
| def create_netcdf(self, nc_file, lyr_name=None): | |
| """ | |
| Create a netcdf datastore and layer. | |
| :param lyr_name: Name of netcdf layer/store to create | |
| :param nc_file: full path + name of netcdf dataset | |
| :return: coveragestore, layername | |
| """ | |
| if not lyr_name: | |
| lyr_name = os.path.splitext(os.path.basename(nc_file))[0] | |
| print "create_netcdf-nc_file===>\n{}".format(nc_file) | |
| ziploc = self.create_netcdf_zip(nc_file, lyr_name) | |
| print "create_netcdf-ziploc===>\n{}".format(ziploc) | |
| try: | |
| with open(ziploc, 'rb') as zipdata: | |
| _user, _password = ogc_server_settings.credentials | |
| gs_store_url = self.gs_store_url.format(self.workspace) | |
| coverage_store_name = '{}'.format(lyr_name) | |
| querystring = {"configure": "all"} | |
| nc_covstore_json = json.dumps({ | |
| 'coverageStore': { | |
| 'name': coverage_store_name, | |
| 'workspace': '{}'.format(self.workspace), | |
| 'enabled': True, | |
| 'type': 'NetCDF', | |
| 'url': 'file:/{}'.format( | |
| os.path.relpath(nc_file, RKH_GS_DATA_DIR)) | |
| } | |
| }) | |
| # create netcdf store | |
| res = requests.post(url=gs_store_url, | |
| data=nc_covstore_json, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'cache-control': "no-cache" | |
| }, | |
| params=querystring) | |
| res.raise_for_status() | |
| gs_url = self.gs_url.format(self.workspace, lyr_name) | |
| data = zipdata.read() | |
| # create netcdf layer in the above store | |
| res = requests.put(url=gs_url, | |
| data=data, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/zip', | |
| 'cache-control': "no-cache" | |
| }) | |
| res.raise_for_status() | |
| gs_url = gs_url.replace( | |
| 'file.netcdf', | |
| 'coverages.json') | |
| # get name of netcdf layer | |
| res = requests.get(url=gs_url, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/json' | |
| }) | |
| res.raise_for_status() | |
| layername = json.loads( | |
| res.text | |
| )["coverages"]["coverage"][0].get("name") | |
| return (lyr_name, layername) | |
| finally: | |
| if os.path.exists(ziploc): | |
| shutil.rmtree(os.path.dirname(ziploc)) | |
| def update_netcdf(self, coverage_store, lyr_name, new_name): | |
| """ | |
| Update a netcdf layer. | |
| :param coverage_store: Store name for the netcdf layer | |
| :param lyr_name: Name of netcdf layer to update | |
| :return: coverage_store, layername | |
| """ | |
| _user, _password = ogc_server_settings.credentials | |
| gs_url = self.gs_url.format(self.workspace, coverage_store) | |
| # update a created netcdf layer | |
| gs_url = gs_url.replace( | |
| 'file.netcdf', | |
| 'coverages/{}.json'.format(lyr_name)) | |
| nc_coverage_json = json.dumps({ | |
| "coverage": { | |
| "enabled": True, | |
| "name": "{}".format(new_name), | |
| "nativeName": "{}".format(new_name), | |
| "title": "{}".format(new_name), | |
| "nativeCRS": "EPSG:4326", | |
| "nativeCoverageName": "{}".format(new_name), | |
| "srs": "EPSG:4326" | |
| } | |
| }) | |
| # update a netcdf layer | |
| res = requests.put(url=gs_url, | |
| data=nc_coverage_json, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'cache-control': "no-cache" | |
| }) | |
| # if res.status_code != 201: | |
| # logger.error('Request status code was: %s' % req.status_code) | |
| # logger.error('Response was: %s' % req.text) | |
| # raise GeoNodeException("Layer could not be created in GeoServer") | |
| res.raise_for_status() | |
| gs_url = gs_url.replace( | |
| 'coverages/{}.json'.format(lyr_name), | |
| 'coverages/{}.json'.format(new_name)) | |
| # get name of netcdf layer | |
| res = requests.get(url=gs_url, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/json' | |
| }) | |
| res.raise_for_status() | |
| layername = json.loads( | |
| res.text | |
| )["coverage"]["name"] | |
| return (coverage_store, layername) | |
| def update_geonode(self, layer_name, title="", description="", | |
| category=None, bounds=None, store=None, | |
| extra_keywords=None): | |
| """ | |
| Update a layer and it's title in GeoNode. | |
| :param layer_name: Name of the layer | |
| :param title: Title for layer | |
| :param description: Description for layer | |
| :param category: Category for the layer | |
| :param bounds: Bounds for layer | |
| :param store: Store for layer | |
| :param extra_keywords: Additional keywords for layer | |
| """ | |
| # Update the layer in GeoNode | |
| ulc = UpdateLayersCommand() | |
| ulc.handle(verbosity=1, filter=layer_name, store=store, | |
| workspace=self.workspace) | |
| # update geoserver layer | |
| _user, _password = ogc_server_settings.credentials | |
| url = ogc_server_settings.internal_rest | |
| gscatalog = Catalog(url, _user, _password) | |
| lyr = Layer.objects.get(typename='geonode:{}'.format(layer_name)) | |
| res = gscatalog.get_resource(lyr.name) | |
| if title: | |
| lyr.title = title | |
| lyr.abstract = description | |
| if category: | |
| lyr.category = category | |
| lyr.save() | |
| if bounds: | |
| res.native_bbox = bounds | |
| gscatalog.save(res) | |
| if extra_keywords: | |
| assert isinstance(extra_keywords, list) | |
| # now = datetime.datetime.now().isoformat() | |
| # extra_keywords.append('datetime:{}'.format(now)) | |
| # Append extra keywords to the default ones | |
| keywords = _add_keywords(res.keywords, extra_keywords) | |
| res.keywords = keywords | |
| gscatalog.save(res) | |
| for keyword in keywords: | |
| lyr.keywords.add(keyword) | |
| lyr.save() | |
| def set_default_style(self, layer_name, sld_name, sld_content, | |
| create=True): | |
| """ | |
| Create a style and assign it as default to a layer. | |
| :param layer_name: the layer to assign the style to | |
| :param sld_name: the name to give the style | |
| :param sld_content: the actual XML content for the style | |
| :param create: create the style if true | |
| :return: None | |
| """ | |
| gs_url = self.gs_style_url | |
| _user, _password = ogc_server_settings.credentials | |
| if create: | |
| # Create the style | |
| s = "<style><name>{n}</name><filename>{n}.sld</filename></style>" | |
| data = s.format(n=sld_name) | |
| res = requests.post(url=gs_url, | |
| data=data, | |
| auth=(_user, _password), | |
| headers={'Content-Type': 'text/xml'}) | |
| res.raise_for_status() | |
| # Populate the style | |
| data = sld_content | |
| url = urljoin(gs_url, sld_name) | |
| logging.debug(url) | |
| res = requests.put(url=url, | |
| data=data, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/vnd.ogc.sld+xml' | |
| }) | |
| res.raise_for_status() | |
| # Assign to the layer | |
| # @TODO check if the style exists in geoserver | |
| layer_typename = "{}%3A{}".format(DEFAULT_WORKSPACE, layer_name) | |
| layer_style_json = json.dumps({ | |
| 'layer': { | |
| 'defaultStyle': { | |
| 'name': '{}'.format(sld_name) | |
| }, | |
| 'styles': { | |
| 'style': [ | |
| { | |
| 'name': '{}'.format(sld_name) | |
| } | |
| ] | |
| } | |
| } | |
| }) | |
| url = urljoin(gs_url.replace("styles", "layers"), layer_typename) | |
| logging.debug(url) | |
| res = requests.put( | |
| url=url, | |
| data=layer_style_json, | |
| auth=(_user, _password), | |
| headers={ | |
| 'Content-Type': 'application/json' | |
| }) | |
| res.raise_for_status() | |
| if res.status_code == requests.codes.ok: | |
| # Update geonode through updatelayers | |
| ulc = UpdateLayersCommand() | |
| ulc.handle(verbosity=1, | |
| filter=layer_name, | |
| store=None, | |
| workspace=self.workspace) | |
| # Assign style as default style of the layer | |
| try: | |
| lyr = Layer.objects.get(name=layer_name) | |
| style = Style.objects.get(name=sld_name) | |
| if not (lyr.default_style == style): | |
| lyr.default_style = style | |
| lyr.save() | |
| except Exception as e: | |
| raise e | |
| def _add_keywords(keyword_list, extra_keywords): | |
| """Adds extra_keywords list to the keyword_list.""" | |
| # check if datetime and category already exist in the | |
| # keyword list | |
| filtered_keywords = [k for k in keyword_list if not | |
| (k.startswith('category:') or | |
| k.startswith('datetime:') or | |
| k.startswith('layer_info'))] | |
| return extra_keywords + filtered_keywords |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment