|
import os |
|
import numpy as np |
|
from qgis.core import (QgsRasterLayer, QgsVectorLayer, QgsProject) |
|
import processing |
|
|
|
def add_layer_if_not_exists(layer, layer_name, zoom_to_layer=False): |
|
"""Add layer to QGIS project if it doesn't already exist, optionally zoom to it.""" |
|
if not QgsProject.instance().mapLayersByName(layer_name): |
|
QgsProject.instance().addMapLayer(layer) |
|
print(f"Layer {layer_name} added to QGIS project.") |
|
if zoom_to_layer: |
|
iface.mapCanvas().setExtent(layer.extent()) |
|
iface.mapCanvas().refresh() |
|
print(f"Zoomed to the {layer_name} layer.") |
|
else: |
|
print(f"Layer {layer_name} already exists in QGIS project.") |
|
|
|
def set_boundary_layer_style(layer): |
|
"""Set the boundary layer to have transparent fill and black outline.""" |
|
symbol = QgsFillSymbol.createSimple({'color': 'transparent', 'outline_color': 'black', 'outline_width': '0.6'}) |
|
layer.renderer().setSymbol(symbol) |
|
layer.triggerRepaint() |
|
print("Boundary layer style updated: transparent fill, black outline.") |
|
|
|
def determine_nodata_value_and_type(data_type): |
|
"""Determine a suitable NODATA value and the corresponding GDAL data type based on the raster data type.""" |
|
if data_type == 1: # Byte |
|
return 255, 1 |
|
elif data_type == 2: # UInt16 |
|
return 65535, 3 |
|
elif data_type == 3: # Int16 |
|
return -32768, 2 |
|
elif data_type == 4: # UInt32 |
|
return 4294967295, 4 |
|
elif data_type == 5: # Int32 |
|
return -2147483648, 5 |
|
elif data_type == 6: # Float32 |
|
return -3.40282346639e+38, 6 |
|
elif data_type == 7: # Float64 |
|
return -1.7976931348623157e+308, 7 |
|
elif data_type == 12: # Int8 |
|
return -128, 12 |
|
else: |
|
raise ValueError(f"Unsupported data type: {data_type}") |
|
|
|
def clip_ntl(input_global_tif, country_boundary_shp, output_clipped_tif): |
|
print("Loading raster and vector layers...") |
|
|
|
ntl_layer = QgsRasterLayer(input_global_tif, "NTL") |
|
boundary_layer = QgsVectorLayer(country_boundary_shp, "Boundary", "ogr") |
|
|
|
if not ntl_layer.isValid(): |
|
print("Error: NTL layer failed to load.") |
|
return |
|
if not boundary_layer.isValid(): |
|
print("Error: Boundary layer failed to load.") |
|
return |
|
|
|
add_layer_if_not_exists(ntl_layer, "NTL") |
|
add_layer_if_not_exists(boundary_layer, "Boundary", zoom_to_layer=True) |
|
|
|
# Set boundary layer style to transparent fill with black outline |
|
set_boundary_layer_style(boundary_layer) |
|
|
|
# Ensure CRS match |
|
print("Checking CRS of boundary and NTL layers...") |
|
if boundary_layer.crs() != ntl_layer.crs(): |
|
print("Reprojecting boundary layer to match NTL CRS...") |
|
params = { |
|
'INPUT': boundary_layer, |
|
'TARGET_CRS': ntl_layer.crs(), |
|
'OUTPUT': 'memory:' |
|
} |
|
boundary_layer = processing.run("native:reprojectlayer", params)['OUTPUT'] |
|
if not boundary_layer.isValid(): |
|
print("Error: Failed to reproject boundary layer.") |
|
return |
|
|
|
# Determine NODATA value and GDAL data type based on the data type of the input raster |
|
data_type = ntl_layer.dataProvider().dataType(1) # Get data type of the first band |
|
nodata_value, gdal_data_type = determine_nodata_value_and_type(data_type) |
|
|
|
# Use gdal:cliprasterbymasklayer with the dynamically determined nodata value and additional config |
|
print(f"Clipping the NTL raster with gdal:cliprasterbymasklayer using NODATA={nodata_value} and DATA_TYPE={gdal_data_type}...") |
|
params = { |
|
'INPUT': ntl_layer, |
|
'MASK': boundary_layer, |
|
'NODATA': nodata_value, |
|
'ALPHA_BAND': False, |
|
'CROP_TO_CUTLINE': True, |
|
'KEEP_RESOLUTION': True, |
|
'SET_RESOLUTION': False, |
|
'X_RESOLUTION': None, |
|
'Y_RESOLUTION': None, |
|
'MULTITHREADING': False, |
|
'OPTIONS': f'--config GDALWARP_IGNORE_BAD_CUTLINE=YES -dstnodata {nodata_value}', |
|
'DATA_TYPE': gdal_data_type, |
|
'OUTPUT': output_clipped_tif |
|
} |
|
|
|
result = processing.run("gdal:cliprasterbymasklayer", params) |
|
|
|
# Check if the output file was created and loaded correctly |
|
if os.path.exists(output_clipped_tif): |
|
print(f"Clipped NTL raster saved to {output_clipped_tif}") |
|
clipped_layer = QgsRasterLayer(output_clipped_tif, "Clipped NTL") |
|
if not clipped_layer.isValid(): |
|
print("Error: Failed to load the clipped NTL raster into QGIS.") |
|
return |
|
QgsProject.instance().addMapLayer(clipped_layer) |
|
print("Clipped NTL raster added to QGIS project.") |
|
else: |
|
print(f"Error: Clipping failed. The output file {output_clipped_tif} was not created.") |
|
return |
|
|
|
def apply_qml_style(layer, qml_path): |
|
"""Apply QML style to a layer.""" |
|
if os.path.exists(qml_path): |
|
layer.loadNamedStyle(qml_path) |
|
layer.triggerRepaint() |
|
print(f"QML style applied from {qml_path}") |
|
else: |
|
print(f"Error: QML file {qml_path} not found.") |
|
|
|
def classify_ntl(input_clipped_tif, output_classified_tif, qml_path, use_max_value_scheme=False): |
|
print("Loading clipped NTL raster for classification...") |
|
layer = QgsRasterLayer(input_clipped_tif, "Clipped NTL") |
|
if not layer.isValid(): |
|
print("Error: Clipped NTL raster failed to load.") |
|
return |
|
provider = layer.dataProvider() |
|
extent = layer.extent() |
|
cols = layer.width() |
|
rows = layer.height() |
|
block = provider.block(1, extent, cols, rows) |
|
|
|
print("Converting raster block to numpy array...") |
|
nodata = provider.sourceNoDataValue(1) |
|
data = np.array([[block.value(i, j) for j in range(cols)] for i in range(rows)]) |
|
|
|
# Ensure consistency in handling nodata |
|
valid_data = data[~np.isnan(data) & (data != nodata)] |
|
|
|
if valid_data.size > 0: |
|
# Calculate statistics |
|
min_value = np.min(valid_data) |
|
max_value = np.max(valid_data) |
|
median = np.median(valid_data) |
|
percentile_75 = np.percentile(valid_data, 75) |
|
|
|
print(f"Min Value: {min_value:.6f}") |
|
print(f"Max Value: {max_value:.6f}") |
|
print(f"Median: {median:.6f}") |
|
print(f"75th Percentile: {percentile_75:.6f}") |
|
else: |
|
print("No valid data found.") |
|
|
|
# Precompute thresholds for both schemes |
|
threshold_1 = 0.05 |
|
threshold_2 = 0.25 * median |
|
threshold_3 = 0.5 * median |
|
threshold_4 = median |
|
threshold_5 = percentile_75 |
|
threshold_6 = max_value |
|
|
|
max_value_thresholds = { |
|
"threshold_1": 0.05, |
|
"threshold_2": 0.2 * max_value, |
|
"threshold_3": 0.4 * max_value, |
|
"threshold_4": 0.6 * max_value, |
|
"threshold_5": 0.8 * max_value, |
|
"threshold_6": max_value, |
|
} |
|
|
|
print("Precomputed thresholds for original scheme:") |
|
print(f"Threshold 1: {threshold_1}") |
|
print(f"Threshold 2: {threshold_2}") |
|
print(f"Threshold 3: {threshold_3}") |
|
print(f"Threshold 4: {threshold_4}") |
|
print(f"Threshold 5: {threshold_5}") |
|
print(f"Threshold 6: {threshold_6}") |
|
|
|
print("Precomputed thresholds for max_value scheme:") |
|
for key, value in max_value_thresholds.items(): |
|
print(f"{key}: {value:.6f}") |
|
|
|
if use_max_value_scheme or max_value <= 0.05 or (max_value - percentile_75) <= 0.05 * max_value: |
|
print("Using max_value classification scheme") |
|
expr_str = (f'("Clipped NTL@1" <= {max_value_thresholds["threshold_1"]}) * 0 + ' |
|
f'("Clipped NTL@1" > {max_value_thresholds["threshold_1"]} AND "Clipped NTL@1" <= {max_value_thresholds["threshold_2"]}) * 1 + ' |
|
f'("Clipped NTL@1" > {max_value_thresholds["threshold_2"]} AND "Clipped NTL@1" <= {max_value_thresholds["threshold_3"]}) * 2 + ' |
|
f'("Clipped NTL@1" > {max_value_thresholds["threshold_3"]} AND "Clipped NTL@1" <= {max_value_thresholds["threshold_4"]}) * 3 + ' |
|
f'("Clipped NTL@1" > {max_value_thresholds["threshold_4"]} AND "Clipped NTL@1" <= {max_value_thresholds["threshold_5"]}) * 4 + ' |
|
f'("Clipped NTL@1" > {max_value_thresholds["threshold_5"]}) * 5') |
|
else: |
|
print("Using original classification scheme") |
|
expr_str = (f'("Clipped NTL@1" <= {threshold_1}) * 0 + ' |
|
f'("Clipped NTL@1" > {threshold_1} AND "Clipped NTL@1" <= {threshold_2}) * 1 + ' |
|
f'("Clipped NTL@1" > {threshold_2} AND "Clipped NTL@1" <= {threshold_3}) * 2 + ' |
|
f'("Clipped NTL@1" > {threshold_3} AND "Clipped NTL@1" <= {threshold_4}) * 3 + ' |
|
f'("Clipped NTL@1" > {threshold_4} AND "Clipped NTL@1" <= {threshold_5}) * 4 + ' |
|
f'("Clipped NTL@1" > {threshold_5}) * 5') |
|
|
|
print(f"Final classification expression: {expr_str}") |
|
|
|
entries = [] |
|
ras = QgsRasterCalculatorEntry() |
|
ras.ref = 'Clipped NTL@1' |
|
ras.raster = layer |
|
ras.bandNumber = 1 |
|
entries.append(ras) |
|
|
|
print("Raster calculator entries:") |
|
for entry in entries: |
|
print(entry.ref, entry.raster, entry.bandNumber) |
|
|
|
calc = QgsRasterCalculator(expr_str, output_classified_tif, 'GTiff', |
|
extent, cols, rows, entries) |
|
result = calc.processCalculation() |
|
|
|
if result == 0: |
|
print(f"Classified NTL raster saved to {output_classified_tif}") |
|
classified_layer = QgsRasterLayer(output_classified_tif, "Classified NTL") |
|
if not classified_layer.isValid(): |
|
print("Error: Failed to load the classified NTL raster into QGIS.") |
|
return |
|
QgsProject.instance().addMapLayer(classified_layer) |
|
print("Classified NTL raster added to QGIS project.") |
|
|
|
# Apply QML style |
|
apply_qml_style(classified_layer, qml_path) |
|
|
|
else: |
|
print(f"Error occurred during classification. Error code: {result}") |
|
|
|
print("Starting process...") |
|
dir = r'D:\temp\geest\factor_safety' |
|
input_global_tif = os.path.join(dir, 'ntl', 'VNL_npp_2023_global_vcmslcfg_v2_c202402081600.average.dat.tif') |
|
country_boundary_shp = os.path.join(dir, 'bnd', 'lca_admbnda_adm0_gov_2019.shp') |
|
output_clipped_tif = os.path.join(dir, 'ntl', 'lca_VNL_npp_2023_global_vcmslcfg_v2_c202402081600.average.dat.tif') |
|
output_classified_tif = os.path.join(dir, 'ntl', 'lca_ntl_geest_class.tif') |
|
qml_path = r'D:\temp\geest\factor_safety\symbology\geest_ntl.qml' |
|
|
|
clip_ntl(input_global_tif, country_boundary_shp, output_clipped_tif) |
|
classify_ntl(output_clipped_tif, output_classified_tif, qml_path, use_max_value_scheme=False) |
|
print("Process completed.") |