Skip to content

Instantly share code, notes, and snippets.

@valgur
Created May 17, 2023 09:58
Show Gist options
  • Save valgur/bab24c5dabf898d411c6d7a9a9e011d8 to your computer and use it in GitHub Desktop.
Save valgur/bab24c5dabf898d411c6d7a9a9e011d8 to your computer and use it in GitHub Desktop.
Data provider for elevation data from Estonian Land Board
import os
from pathlib import Path
import numpy as np
import rasterio
import requests
from scipy.interpolate import RectBivariateSpline
from tqdm.auto import tqdm
class ElevationReference:
def __init__(self, dem_dir, bounds, resolution=1):
"""
Provides elevation values for Estonia based on Estonian Land Board's digital elevation model (DEM).
The tiles are downloaded automatically.
Also converts from geoid (aka mean sea level) to WGS84 ellipsoid reference, if requested.
All coordinates are in L-EST97, with (x, y) corresponding to easting, northing.
Parameters
----------
dem_dir: Directory where the DEM tiles will be downloaded to.
bounds: (xmin, ymin, xmax, ymax) bounds of the area to be loaded.
resolution: Resolution of the DEM. Either 1 (default), 5 or 10 m.
"""
self.dem_dir = Path(dem_dir)
self.resolution = int(resolution)
if self.resolution not in (1, 5, 10):
raise ValueError("DEM resolution must be either 1, 5 or 10")
if self.resolution <= 5:
self.tile_size = 5000
self.to_grid = grid10k
self.from_grid = decode_grid10k
else:
self.tile_size = 10000
self.to_grid = grid20k
self.from_grid = decode_grid20k
xmin, ymin, xmax, ymax = bounds
xmin = int(self.resolution * np.floor(xmin / self.resolution))
ymin = int(self.resolution * np.floor(ymin / self.resolution))
xmax = int(self.resolution * np.ceil(xmax / self.resolution))
ymax = int(self.resolution * np.ceil(ymax / self.resolution))
assert xmax > xmin
assert ymax > ymin
self.bounds = xmin, ymin, xmax, ymax
self.geoid_interpolator = None
self.dem = None
self.dem_interpolator = None
self._create_geoid_interpolator()
self._load_dem_dataset()
self._create_dem_interpolator()
def get(self, lest_x, lest_y, reference='WGS84'):
z = self.dem_interpolator(lest_x, lest_y)
if reference == 'WGS84':
return z + self.geoid_height(lest_x, lest_y)
elif reference == 'geoid':
return z
else:
raise ValueError('Elevation reference must be either "WGS84" or "geoid"')
def geoid_height(self, lest_x, lest_y):
"""Interpolated EST-GEOID2017 values. Takes L-EST97 (easting, northing) as input.
The geoid value represents meters from GRS80/WGS84 ellipsoid, up is positive.
Grid spacing is 5 km (1:10000 grid centroids).
"""
return self.geoid_interpolator(lest_x, lest_y)
def _load_dem_dataset(self):
xmin, ymin, xmax, ymax = self.bounds
rows = (ymax - ymin) // self.resolution
cols = (xmax - xmin) // self.resolution
self.dem = np.zeros((rows, cols), dtype=np.float32)
for idx in tqdm(self._bounds_to_tile_ids(), desc='Loading DEM tiles', leave=False):
x0, y0 = self.from_grid(idx)
x_idx0 = max(0, (x0 - xmin) // self.resolution)
y_idx0 = max(0, (y0 - ymin) // self.resolution)
xbegin = (max(x0, xmin) - x0) // self.resolution
ybegin = (max(y0, ymin) - y0) // self.resolution
xend = (min(x0 + self.tile_size, xmax) - x0) // self.resolution
yend = (min(y0 + self.tile_size, ymax) - y0) // self.resolution
chunk = self._load_tile(idx)[::-1][ybegin:yend, xbegin:xend]
self.dem[y_idx0:, x_idx0:][:yend - ybegin, :xend - xbegin] = chunk
def _create_geoid_interpolator(self):
script_dir = os.path.dirname(__file__)
data = np.load(os.path.join(script_dir, "geoid.npz"))
interp = RectBivariateSpline(data["gridx"], data["gridy"], -data["geoid"].T, kx=1, ky=1)
self.geoid_interpolator = interp.ev
def _create_dem_interpolator(self):
xmin, ymin, xmax, ymax = self.bounds
gridx = np.arange(xmin + self.resolution / 2, xmax, self.resolution)
gridy = np.arange(ymin + self.resolution / 2, ymax, self.resolution)
interp = RectBivariateSpline(gridx, gridy, self.dem.T, kx=1, ky=1)
self.dem_interpolator = interp.ev
def _bounds_to_tile_ids(self):
xmin, ymin, xmax, ymax = self.bounds
x0, y0 = self.from_grid(self.to_grid(xmin, ymin))
tiles = []
for y in range(y0, ymax, self.tile_size):
for x in range(x0, xmax, self.tile_size):
tiles.append(self.to_grid(x, y))
return tiles
def _load_tile(self, idx):
tile_path = self.dem_dir / self._get_tile_name(idx)
if not tile_path.exists():
self.dem_dir.mkdir(parents=True, exist_ok=True)
download(self._get_download_url(idx), tile_path)
return rasterio.open(tile_path).read()[0]
def _get_tile_name(self, idx):
return f'{idx}_dem_{self.resolution}m.tif'
def _get_download_url(self, idx):
return ('https://geoportaal.maaamet.ee/index.php?plugin_act=otsing&page_id=614&dl=1&'
f'andmetyyp=dem_{self.resolution}m_geotiff&'
f'kaardiruut={idx}&f=' + self._get_tile_name(idx))
def download(url, dst):
head = requests.head(url)
head.raise_for_status()
file_size = int(head.headers["Content-Length"])
if os.path.exists(dst):
first_byte = os.path.getsize(dst)
else:
first_byte = 0
if first_byte >= file_size:
return file_size
header = {"Range": "bytes=%s-%s" % (first_byte, file_size)}
pbar = tqdm(
total=file_size, initial=first_byte,
unit='B', unit_scale=True, desc=f'Downloading {Path(dst).name}', leave=False)
req = requests.get(url, headers=header, stream=True)
req.raise_for_status()
with open(dst, 'ab') as f:
for chunk in req.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
pbar.close()
return file_size
def grid20k(x, y):
x = int(x) // 1000 - 200
y = int(y) // 1000 - 5900
idx = (y // 100) * 1000 + (y // 10 % 10) * 10
idx += (x // 100) * 100 + (x // 10 % 10)
return idx
def grid10k(x, y):
idx = grid20k(x, y) * 10
idx += (int(y) // 5000 % 2) * 2
idx += (int(x) // 5000 % 2) + 1
return idx
def grid2k(x, y):
return (int(y / 1000) - 6000) * 1000 + int(x / 1000)
def decode_grid20k(idx):
x = ((idx // 100) % 10) * 10
x += idx % 10
y = (idx // 1000) * 10
y += (idx // 10) % 10
return (x + 20) * 10000, (y + 590) * 10000
def decode_grid10k(idx):
x, y = decode_grid20k(idx // 10)
sub_idx = idx % 10 - 1
x += (sub_idx % 2) * 5000
y += (sub_idx // 2) * 5000
return x, y
def decode_grid2k(idx):
return idx % 1000 * 1000, (idx // 1000 + 6000) * 1000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment