Last active
May 18, 2020 11:59
-
-
Save johnhw/5549d36a6534909c8e04eb3b1f0c445f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Converts Ordnance Survey Terrian 50 United Kingdom data to a simple NumPy array of terrain altitudes | |
# The original OS data is freely downloadable from: https://www.ordnancesurvey.co.uk/business-government/products/terrain-50 | |
# Directly converts the zipped terrain file to a NPZ file | |
# Resulting array is a (24600, 13200) array of float64 | |
# Requirements: | |
# * pycrs | |
# * lxml | |
import os, sys | |
import numpy as np | |
import zipfile | |
import re | |
import fnmatch | |
from lxml import etree | |
import pycrs | |
import scipy.sparse | |
def load_elevation(map_zip, grid, n): | |
"""Take a grid reference (e.g. HU) and a subgrid number (e.g. 41), and load | |
the relevant elevation file, returning the x and y of the lower left corner, | |
the size of the grid units and the elevation as a 2D numpy array.""" | |
# open the relevant zip file, returning all none if no such path | |
zip_path = f"data/{grid}/{grid}{n:02d}_OST50GRID_*.zip" | |
matches = fnmatch.filter(map_zip.namelist(), zip_path) | |
if len(matches) == 0: | |
return None | |
zip_path = matches[0] | |
with map_zip.open(zip_path) as zip_f: | |
with zipfile.ZipFile(zip_f, "r") as inner_zip: | |
# open the ASCII grid data | |
with inner_zip.open(f"{grid.upper()}{n:02d}.gml") as f: | |
gml = etree.parse(f) | |
with inner_zip.open(f"{grid.upper()}{n:02d}.prj") as f: | |
projection = pycrs.parse.from_esri_wkt(f.read().decode()) | |
with inner_zip.open(f"Metadata_{grid.upper()}{n:02d}.xml") as f: | |
metadata = etree.parse(f) | |
with inner_zip.open(f"{grid.upper()}{n:02d}.asc.aux.xml") as f: | |
aux = etree.parse(f) | |
with inner_zip.open(f"{grid.upper()}{n:02d}.asc") as f: | |
# read the header | |
for line in f: | |
line = line.decode("ascii") | |
elts = line.split(" ") | |
if elts[0] == "xllcorner": | |
x = int(elts[1]) | |
if elts[0] == "yllcorner": | |
y = int(elts[1]) | |
if elts[0] == "cellsize": | |
size = int(elts[1]) | |
# parse the array | |
with inner_zip.open(f"{grid.upper()}{n:02d}.asc") as f: | |
elevation = np.genfromtxt(f, skip_header=5) | |
return {"x":x, "y":y, "size":size, "projection":projection, "metadata":metadata, "aux":aux, "gml":gml, "elevation":elevation, | |
} | |
def load_region(map_zip, grid_refs, grids): | |
"""Load all sub-grids in each grid_ref, and return them | |
as a dictionary of subgrids""" | |
h_max = 0 | |
# for each grid ref | |
for grid in grid_refs: | |
# load all sub-grids | |
for n in range(100): | |
elev = load_elevation(map_zip, grid, n) | |
# if one can be found, store the elevation | |
if elev: | |
x, y = elev["x"], elev["y"] | |
grids[(x, y)] = elev | |
print(f"Loaded {grid}{n}") | |
def merge_grids(grids): | |
xs = [square["x"] for key,square in grids.items()] | |
ys = [square["y"] for key,square in grids.items()] | |
resolution = 50 | |
cell_size = 200 | |
min_x, max_x = np.min(xs), np.max(xs) | |
min_y, max_y = np.min(ys), np.max(ys) | |
x_span = (max_x-min_x) // resolution | |
y_span = (max_y-min_y) // resolution | |
full_map = np.zeros((x_span+cell_size, y_span+cell_size)) | |
for key, square in grids.items(): | |
x, y = (square["x"]-min_x)//resolution, (square["y"]-min_y)//resolution | |
elevation = square["elevation"] | |
full_map[x:x+cell_size, y:y+cell_size] = np.fliplr(elevation.transpose()) | |
# full_map_sparse = scipy.sparse.bsr_matrix(np.flipud(full_map.T), blocksize=(200,200)) | |
return min_x, min_y, full_map | |
def load_map(map_path): | |
terr50_zip = zipfile.ZipFile(map_path) | |
# find all regions in the terrain file | |
regions = [re.findall("data/([a-z][a-z])/*", fname) for fname in terr50_zip.namelist()] | |
# filter for unique tags | |
regions = {region[0] for region in regions if len(region)>0} | |
grids = {} | |
load_region(terr50_zip, list(regions), grids) | |
min_x, min_y, merged = merge_grids(grids) | |
np.save("os_gb_terr_50.npy", merged) | |
print(min_x, min_y) | |
if __name__=="__main__": | |
path = "terr50_gagg_gb.zip" | |
load_map(path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment