Skip to content

Instantly share code, notes, and snippets.

@phargogh
Created February 2, 2022 22:59
Show Gist options
  • Select an option

  • Save phargogh/e746fb5b57726402acc8ce6330276905 to your computer and use it in GitHub Desktop.

Select an option

Save phargogh/e746fb5b57726402acc8ce6330276905 to your computer and use it in GitHub Desktop.
Experiment with when GDAL defaults to nonsquare block sizes
import logging
import os
import os.path
import numpy
import pygeoprocessing
from osgeo import gdal
from osgeo import osr
logging.basicConfig(level=logging.WARNING)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
# copied from natcap.invest.habitat_quality, version 3.10.1.post99+g3f22bc4a0.d20220202
def _make_linear_decay_kernel_path(max_distance, kernel_path):
"""Create a linear decay kernel as a raster.
Pixels in raster are equal to d / max_distance where d is the distance to
the center of the raster in number of pixels.
Args:
max_distance (int): number of pixels out until the decay is 0.
kernel_path (string): path to output raster whose values are in (0,1)
representing distance to edge.
Size is (``max_distance`` * 2 + 1)
Returns:
None
"""
kernel_size = int(numpy.round(max_distance * 2 + 1))
driver = gdal.GetDriverByName('GTiff')
kernel_dataset = driver.Create(
kernel_path.encode('utf-8'), kernel_size, kernel_size, 1,
gdal.GDT_Float32, options=['BIGTIFF=IF_SAFER'])
# Make some kind of geotransform, it doesn't matter what but
# will make GIS libraries behave better if it's all defined
kernel_dataset.SetGeoTransform([444720, 30, 0, 3751320, 0, -30])
srs = osr.SpatialReference()
srs.SetUTM(11, 1)
srs.SetWellKnownGeogCS('NAD27')
kernel_dataset.SetProjection(srs.ExportToWkt())
kernel_band = kernel_dataset.GetRasterBand(1)
kernel_band.SetNoDataValue(-9999)
col_index = numpy.array(range(kernel_size))
integration = 0.0
for row_index in range(kernel_size):
distance_kernel_row = numpy.sqrt(
(row_index - max_distance) ** 2 +
(col_index - max_distance) ** 2).reshape(1, kernel_size)
kernel = numpy.where(
distance_kernel_row > max_distance, 0.0,
(max_distance - distance_kernel_row) / max_distance)
integration += numpy.sum(kernel)
kernel_band.WriteArray(kernel, xoff=0, yoff=row_index)
for row_index in range(kernel_size):
kernel_row = kernel_band.ReadAsArray(
xoff=0, yoff=row_index, win_xsize=kernel_size, win_ysize=1)
kernel_row /= integration
kernel_band.WriteArray(kernel_row, 0, row_index)
if __name__ == '__main__':
workspace = 'kernel-experiment-workspace'
if not os.path.exists(workspace):
os.makedirs(workspace)
# Let's do a binary search to find the raster size after which rows are
# used.
max_distance = 4
lowest_row_based_distance = None
tiled = True
def _make_raster(max_distance):
kernel_path = os.path.join(workspace, f'{max_distance}.tif')
_make_linear_decay_kernel_path(max_distance, kernel_path)
return kernel_path
def _is_row_based(max_distance):
kernel_path = _make_raster(max_distance)
raster_info = pygeoprocessing.get_raster_info(kernel_path)
LOGGER.debug(f"{max_distance}: {raster_info['block_size']}")
return 1 in raster_info['block_size']
# Starting from a basic case, double the max distance until we find the
# first striped one.
while True:
if _is_row_based(max_distance):
lowest_row_based_distance = max_distance
break
else:
max_distance *= 2
# We now have the highest tiled distance and we also have the lowest
# row-based distance. Binary search until we find the last tiled distance.
highest_tiled_distance = max_distance / 2
while True:
if lowest_row_based_distance - highest_tiled_distance == 1:
print(f"Last tiled distance: {highest_tiled_distance}")
break
# New max distance is halfway between these two distances.
max_distance = (highest_tiled_distance + (
lowest_row_based_distance - highest_tiled_distance) / 2)
if _is_row_based(max_distance):
lowest_row_based_distance = max_distance
else:
highest_tiled_distance = max_distance
# Sanity check for the two bounds.
def _checkit(max_distance):
kernel_path = _make_raster(max_distance)
block_size = pygeoprocessing.get_raster_info(kernel_path)['block_size']
print(f"Blocksize for {kernel_path}: {block_size}")
_checkit(lowest_row_based_distance)
_checkit(highest_tiled_distance)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment