Created
February 2, 2022 22:59
-
-
Save phargogh/e746fb5b57726402acc8ce6330276905 to your computer and use it in GitHub Desktop.
Experiment with when GDAL defaults to nonsquare block sizes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import os | |
| import os.path | |
| import numpy | |
| import pygeoprocessing | |
| from osgeo import gdal | |
| from osgeo import osr | |
| logging.basicConfig(level=logging.WARNING) | |
| LOGGER = logging.getLogger(__name__) | |
| LOGGER.setLevel(logging.DEBUG) | |
| # copied from natcap.invest.habitat_quality, version 3.10.1.post99+g3f22bc4a0.d20220202 | |
| def _make_linear_decay_kernel_path(max_distance, kernel_path): | |
| """Create a linear decay kernel as a raster. | |
| Pixels in raster are equal to d / max_distance where d is the distance to | |
| the center of the raster in number of pixels. | |
| Args: | |
| max_distance (int): number of pixels out until the decay is 0. | |
| kernel_path (string): path to output raster whose values are in (0,1) | |
| representing distance to edge. | |
| Size is (``max_distance`` * 2 + 1) | |
| Returns: | |
| None | |
| """ | |
| kernel_size = int(numpy.round(max_distance * 2 + 1)) | |
| driver = gdal.GetDriverByName('GTiff') | |
| kernel_dataset = driver.Create( | |
| kernel_path.encode('utf-8'), kernel_size, kernel_size, 1, | |
| gdal.GDT_Float32, options=['BIGTIFF=IF_SAFER']) | |
| # Make some kind of geotransform, it doesn't matter what but | |
| # will make GIS libraries behave better if it's all defined | |
| kernel_dataset.SetGeoTransform([444720, 30, 0, 3751320, 0, -30]) | |
| srs = osr.SpatialReference() | |
| srs.SetUTM(11, 1) | |
| srs.SetWellKnownGeogCS('NAD27') | |
| kernel_dataset.SetProjection(srs.ExportToWkt()) | |
| kernel_band = kernel_dataset.GetRasterBand(1) | |
| kernel_band.SetNoDataValue(-9999) | |
| col_index = numpy.array(range(kernel_size)) | |
| integration = 0.0 | |
| for row_index in range(kernel_size): | |
| distance_kernel_row = numpy.sqrt( | |
| (row_index - max_distance) ** 2 + | |
| (col_index - max_distance) ** 2).reshape(1, kernel_size) | |
| kernel = numpy.where( | |
| distance_kernel_row > max_distance, 0.0, | |
| (max_distance - distance_kernel_row) / max_distance) | |
| integration += numpy.sum(kernel) | |
| kernel_band.WriteArray(kernel, xoff=0, yoff=row_index) | |
| for row_index in range(kernel_size): | |
| kernel_row = kernel_band.ReadAsArray( | |
| xoff=0, yoff=row_index, win_xsize=kernel_size, win_ysize=1) | |
| kernel_row /= integration | |
| kernel_band.WriteArray(kernel_row, 0, row_index) | |
| if __name__ == '__main__': | |
| workspace = 'kernel-experiment-workspace' | |
| if not os.path.exists(workspace): | |
| os.makedirs(workspace) | |
| # Let's do a binary search to find the raster size after which rows are | |
| # used. | |
| max_distance = 4 | |
| lowest_row_based_distance = None | |
| tiled = True | |
| def _make_raster(max_distance): | |
| kernel_path = os.path.join(workspace, f'{max_distance}.tif') | |
| _make_linear_decay_kernel_path(max_distance, kernel_path) | |
| return kernel_path | |
| def _is_row_based(max_distance): | |
| kernel_path = _make_raster(max_distance) | |
| raster_info = pygeoprocessing.get_raster_info(kernel_path) | |
| LOGGER.debug(f"{max_distance}: {raster_info['block_size']}") | |
| return 1 in raster_info['block_size'] | |
| # Starting from a basic case, double the max distance until we find the | |
| # first striped one. | |
| while True: | |
| if _is_row_based(max_distance): | |
| lowest_row_based_distance = max_distance | |
| break | |
| else: | |
| max_distance *= 2 | |
| # We now have the highest tiled distance and we also have the lowest | |
| # row-based distance. Binary search until we find the last tiled distance. | |
| highest_tiled_distance = max_distance / 2 | |
| while True: | |
| if lowest_row_based_distance - highest_tiled_distance == 1: | |
| print(f"Last tiled distance: {highest_tiled_distance}") | |
| break | |
| # New max distance is halfway between these two distances. | |
| max_distance = (highest_tiled_distance + ( | |
| lowest_row_based_distance - highest_tiled_distance) / 2) | |
| if _is_row_based(max_distance): | |
| lowest_row_based_distance = max_distance | |
| else: | |
| highest_tiled_distance = max_distance | |
| # Sanity check for the two bounds. | |
| def _checkit(max_distance): | |
| kernel_path = _make_raster(max_distance) | |
| block_size = pygeoprocessing.get_raster_info(kernel_path)['block_size'] | |
| print(f"Blocksize for {kernel_path}: {block_size}") | |
| _checkit(lowest_row_based_distance) | |
| _checkit(highest_tiled_distance) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment