Created
May 17, 2023 09:58
-
-
Save valgur/bab24c5dabf898d411c6d7a9a9e011d8 to your computer and use it in GitHub Desktop.
Data provider for elevation data from Estonian Land Board
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pathlib import Path | |
import numpy as np | |
import rasterio | |
import requests | |
from scipy.interpolate import RectBivariateSpline | |
from tqdm.auto import tqdm | |
class ElevationReference: | |
def __init__(self, dem_dir, bounds, resolution=1): | |
""" | |
Provides elevation values for Estonia based on Estonian Land Board's digital elevation model (DEM). | |
The tiles are downloaded automatically. | |
Also converts from geoid (aka mean sea level) to WGS84 ellipsoid reference, if requested. | |
All coordinates are in L-EST97, with (x, y) corresponding to easting, northing. | |
Parameters | |
---------- | |
dem_dir: Directory where the DEM tiles will be downloaded to. | |
bounds: (xmin, ymin, xmax, ymax) bounds of the area to be loaded. | |
resolution: Resolution of the DEM. Either 1 (default), 5 or 10 m. | |
""" | |
self.dem_dir = Path(dem_dir) | |
self.resolution = int(resolution) | |
if self.resolution not in (1, 5, 10): | |
raise ValueError("DEM resolution must be either 1, 5 or 10") | |
if self.resolution <= 5: | |
self.tile_size = 5000 | |
self.to_grid = grid10k | |
self.from_grid = decode_grid10k | |
else: | |
self.tile_size = 10000 | |
self.to_grid = grid20k | |
self.from_grid = decode_grid20k | |
xmin, ymin, xmax, ymax = bounds | |
xmin = int(self.resolution * np.floor(xmin / self.resolution)) | |
ymin = int(self.resolution * np.floor(ymin / self.resolution)) | |
xmax = int(self.resolution * np.ceil(xmax / self.resolution)) | |
ymax = int(self.resolution * np.ceil(ymax / self.resolution)) | |
assert xmax > xmin | |
assert ymax > ymin | |
self.bounds = xmin, ymin, xmax, ymax | |
self.geoid_interpolator = None | |
self.dem = None | |
self.dem_interpolator = None | |
self._create_geoid_interpolator() | |
self._load_dem_dataset() | |
self._create_dem_interpolator() | |
def get(self, lest_x, lest_y, reference='WGS84'): | |
z = self.dem_interpolator(lest_x, lest_y) | |
if reference == 'WGS84': | |
return z + self.geoid_height(lest_x, lest_y) | |
elif reference == 'geoid': | |
return z | |
else: | |
raise ValueError('Elevation reference must be either "WGS84" or "geoid"') | |
def geoid_height(self, lest_x, lest_y): | |
"""Interpolated EST-GEOID2017 values. Takes L-EST97 (easting, northing) as input. | |
The geoid value represents meters from GRS80/WGS84 ellipsoid, up is positive. | |
Grid spacing is 5 km (1:10000 grid centroids). | |
""" | |
return self.geoid_interpolator(lest_x, lest_y) | |
def _load_dem_dataset(self): | |
xmin, ymin, xmax, ymax = self.bounds | |
rows = (ymax - ymin) // self.resolution | |
cols = (xmax - xmin) // self.resolution | |
self.dem = np.zeros((rows, cols), dtype=np.float32) | |
for idx in tqdm(self._bounds_to_tile_ids(), desc='Loading DEM tiles', leave=False): | |
x0, y0 = self.from_grid(idx) | |
x_idx0 = max(0, (x0 - xmin) // self.resolution) | |
y_idx0 = max(0, (y0 - ymin) // self.resolution) | |
xbegin = (max(x0, xmin) - x0) // self.resolution | |
ybegin = (max(y0, ymin) - y0) // self.resolution | |
xend = (min(x0 + self.tile_size, xmax) - x0) // self.resolution | |
yend = (min(y0 + self.tile_size, ymax) - y0) // self.resolution | |
chunk = self._load_tile(idx)[::-1][ybegin:yend, xbegin:xend] | |
self.dem[y_idx0:, x_idx0:][:yend - ybegin, :xend - xbegin] = chunk | |
def _create_geoid_interpolator(self): | |
script_dir = os.path.dirname(__file__) | |
data = np.load(os.path.join(script_dir, "geoid.npz")) | |
interp = RectBivariateSpline(data["gridx"], data["gridy"], -data["geoid"].T, kx=1, ky=1) | |
self.geoid_interpolator = interp.ev | |
def _create_dem_interpolator(self): | |
xmin, ymin, xmax, ymax = self.bounds | |
gridx = np.arange(xmin + self.resolution / 2, xmax, self.resolution) | |
gridy = np.arange(ymin + self.resolution / 2, ymax, self.resolution) | |
interp = RectBivariateSpline(gridx, gridy, self.dem.T, kx=1, ky=1) | |
self.dem_interpolator = interp.ev | |
def _bounds_to_tile_ids(self): | |
xmin, ymin, xmax, ymax = self.bounds | |
x0, y0 = self.from_grid(self.to_grid(xmin, ymin)) | |
tiles = [] | |
for y in range(y0, ymax, self.tile_size): | |
for x in range(x0, xmax, self.tile_size): | |
tiles.append(self.to_grid(x, y)) | |
return tiles | |
def _load_tile(self, idx): | |
tile_path = self.dem_dir / self._get_tile_name(idx) | |
if not tile_path.exists(): | |
self.dem_dir.mkdir(parents=True, exist_ok=True) | |
download(self._get_download_url(idx), tile_path) | |
return rasterio.open(tile_path).read()[0] | |
def _get_tile_name(self, idx): | |
return f'{idx}_dem_{self.resolution}m.tif' | |
def _get_download_url(self, idx): | |
return ('https://geoportaal.maaamet.ee/index.php?plugin_act=otsing&page_id=614&dl=1&' | |
f'andmetyyp=dem_{self.resolution}m_geotiff&' | |
f'kaardiruut={idx}&f=' + self._get_tile_name(idx)) | |
def download(url, dst): | |
head = requests.head(url) | |
head.raise_for_status() | |
file_size = int(head.headers["Content-Length"]) | |
if os.path.exists(dst): | |
first_byte = os.path.getsize(dst) | |
else: | |
first_byte = 0 | |
if first_byte >= file_size: | |
return file_size | |
header = {"Range": "bytes=%s-%s" % (first_byte, file_size)} | |
pbar = tqdm( | |
total=file_size, initial=first_byte, | |
unit='B', unit_scale=True, desc=f'Downloading {Path(dst).name}', leave=False) | |
req = requests.get(url, headers=header, stream=True) | |
req.raise_for_status() | |
with open(dst, 'ab') as f: | |
for chunk in req.iter_content(chunk_size=1024 * 1024): | |
if chunk: | |
f.write(chunk) | |
pbar.update(len(chunk)) | |
pbar.close() | |
return file_size | |
def grid20k(x, y): | |
x = int(x) // 1000 - 200 | |
y = int(y) // 1000 - 5900 | |
idx = (y // 100) * 1000 + (y // 10 % 10) * 10 | |
idx += (x // 100) * 100 + (x // 10 % 10) | |
return idx | |
def grid10k(x, y): | |
idx = grid20k(x, y) * 10 | |
idx += (int(y) // 5000 % 2) * 2 | |
idx += (int(x) // 5000 % 2) + 1 | |
return idx | |
def grid2k(x, y): | |
return (int(y / 1000) - 6000) * 1000 + int(x / 1000) | |
def decode_grid20k(idx): | |
x = ((idx // 100) % 10) * 10 | |
x += idx % 10 | |
y = (idx // 1000) * 10 | |
y += (idx // 10) % 10 | |
return (x + 20) * 10000, (y + 590) * 10000 | |
def decode_grid10k(idx): | |
x, y = decode_grid20k(idx // 10) | |
sub_idx = idx % 10 - 1 | |
x += (sub_idx % 2) * 5000 | |
y += (sub_idx // 2) * 5000 | |
return x, y | |
def decode_grid2k(idx): | |
return idx % 1000 * 1000, (idx // 1000 + 6000) * 1000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment