Created
March 13, 2024 08:27
-
-
Save bennyistanto/5cf7979ad7043dc0e837185d2cb04ddb to your computer and use it in GitHub Desktop.
Generate derivative product from Vegetation Indices - annual timeseries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
NAME | |
13_mxd13q1_viproducts_annual.py | |
Generate derivative product from Vegetation Indices | |
DESCRIPTION | |
Input data for this script will use MXD13Q1 annual data generate by 06_mxd13q1_8day2annual.py | |
This script can do calculation for ratio, difference, standardize anomaly | |
and vegetation condition index. | |
The calculation required timeseries VI and the long-term statistics (min, mean, max, std) | |
REQUIREMENT | |
ArcGIS must installed before using this script, as it required arcpy module. | |
EXAMPLES | |
C:\\Program Files\\ArcGIS\\Pro\\bin\\Python\\envs\\arcgispro-py3\\python 13_mxd13q1_viproducts_annual.py | |
NOTES | |
This script is designed to work with MODIS naming convention | |
If using other data, some adjustment are required: parsing filename and directory | |
CONTACT | |
Benny Istanto | |
Climate Geographer | |
GOST/DECAT/DECDG, The World Bank | |
LICENSE | |
This script is in the public domain, free from copyrights or restrictions. | |
VERSION | |
$Id$ | |
TODO | |
xx | |
""" | |
import os | |
import arcpy | |
from datetime import datetime, timedelta | |
import re | |
import uuid # For generating unique temporary filenames | |
# To avoid overwriting outputs, set overwriteOutput option to True for this script execution context | |
arcpy.env.overwriteOutput = True | |
# Raster environment settings for output consistency and optimization | |
arcpy.env.compression = "LZ77" | |
arcpy.env.pyramid = "PYRAMIDS -1 NEAREST LZ77 NO_SKIP" | |
# ISO3 Country Code | |
iso3 = "idn" # Syria: syr, Myanmar: mmr, Lebanon: lbn | |
# Define input and output folders | |
input_folder = "D:\\temp\\modis\\{}\\gee\\04_fillnullwithstats\\evi_all_annual".format(iso3) | |
stats_folder = "D:\\temp\\modis\\{}\\gee\\03_statistics\\evi_all_annual".format(iso3) | |
ratioanom_folder = "D:\\temp\\modis\\{}\\gee\\05_ratioanom\\evi_all_annual".format(iso3) | |
diffanom_folder = "D:\\temp\\modis\\{}\\gee\\06_diffanom\\evi_all_annual".format(iso3) | |
stdanom_folder = "D:\\temp\\modis\\{}\\gee\\07_stdanom\\evi_all_annual".format(iso3) | |
vci_folder = "D:\\temp\\modis\\{}\\gee\\08_vci\\evi_all_annual".format(iso3) | |
print("Script started...") | |
# Check if input folder exists | |
if not os.path.exists(input_folder): | |
print(f"Input folder does not exist: {input_folder}") | |
exit(1) | |
print("Input folder found, processing...") | |
# Global variable to store user's choice | |
user_choice = None | |
def set_user_decision(): | |
"""Prompt user for decision on existing files and store it globally.""" | |
global user_choice | |
if user_choice is None: | |
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper() | |
while decision not in ['R', 'S', 'A']: | |
print("Invalid choice. Please choose again.") | |
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper() | |
user_choice = decision | |
def derivative_vi(input_folder, stats_folder, ratioanom_folder, diffanom_folder, stdanom_folder, vci_folder): | |
global user_choice # Use the global variable to store the user's choice | |
# Loop through the files in the input folder | |
for raster in os.listdir(input_folder): | |
# Only process .tif files | |
if raster.endswith(".tif") or raster.endswith(".tiff"): | |
print(f"Processing raster: {raster}") | |
parts = raster.split("_") # idn_phy_mxd13q1_evi_mean_2023.tif | |
# Check the parts of the filename | |
print(f"Filename parts: {parts}") | |
# Adjusting to the actual number of parts in the filenames | |
if len(parts) == 6: # Ensure the filename has the expected number of parts | |
year = parts[5].split(".")[0] | |
# Construct the corresponding filenames in the stats folder | |
avg_raster_name = f"{iso3}_phy_mxd13q1_evi_annual_2003_2022_avg.tif" | |
min_raster_name = f"{iso3}_phy_mxd13q1_evi_annual_2003_2022_min.tif" | |
max_raster_name = f"{iso3}_phy_mxd13q1_evi_annual_2003_2022_max.tif" | |
std_raster_name = f"{iso3}_phy_mxd13q1_evi_annual_2003_2022_std.tif" | |
# Full paths to the statistics rasters | |
avg_raster = os.path.join(stats_folder, avg_raster_name) | |
min_raster = os.path.join(stats_folder, min_raster_name) | |
max_raster = os.path.join(stats_folder, max_raster_name) | |
std_raster = os.path.join(stats_folder, std_raster_name) | |
# Verify that the statistics rasters exist | |
stats_rasters_exist = True | |
for stats_raster in [avg_raster, min_raster, max_raster, std_raster]: | |
if not arcpy.Exists(stats_raster): | |
print(f"Error: {stats_raster} not found in {stats_folder}") | |
stats_rasters_exist = False | |
break | |
else: | |
print(f"Stats raster found: {stats_raster}") | |
if not stats_rasters_exist: | |
continue | |
# Define output paths for anomalies and VCI | |
ratioanom_raster = os.path.join(ratioanom_folder, f"{iso3}_phy_mxd13q1_ratioanom_{year}.tif") | |
diffanom_raster = os.path.join(diffanom_folder, f"{iso3}_phy_mxd13q1_diffanom_{year}.tif") | |
stdanom_raster = os.path.join(stdanom_folder, f"{iso3}_phy_mxd13q1_stdanom_{year}.tif") | |
vci_raster = os.path.join(vci_folder, f"{iso3}_phy_mxd13q1_vci_{year}.tif") | |
# Now, insert a loop to check the existence of each output and prompt for user decision if necessary | |
outputs = [ratioanom_raster, diffanom_raster, stdanom_raster, vci_raster] | |
for output_raster in outputs: | |
if arcpy.Exists(output_raster): | |
if user_choice is None: | |
set_user_decision() | |
if user_choice == 'S': | |
print(f"Skipping existing file: {output_raster}") | |
continue # Skip to the next output raster | |
elif user_choice == 'A': | |
print("Aborting process.") | |
return # Exit the function | |
elif user_choice == 'R': | |
arcpy.Delete_management(output_raster) # Delete if replacing | |
try: | |
arcpy.CheckOutExtension("spatial") | |
# Prepare the input raster | |
in_raster = arcpy.Raster(os.path.join(input_folder, raster)) | |
# Calculate the indices | |
ratioanom = arcpy.sa.Divide(arcpy.sa.Float(in_raster), arcpy.sa.Float(avg_raster)) * 100 | |
diffanom = arcpy.sa.Float(in_raster) - arcpy.sa.Float(avg_raster) | |
stdanom = arcpy.sa.Divide((arcpy.sa.Float(in_raster) - arcpy.sa.Float(avg_raster)), arcpy.sa.Float(std_raster)) | |
vci_unclipped = arcpy.sa.Divide((arcpy.sa.Float(in_raster) - arcpy.sa.Float(min_raster)), (arcpy.sa.Float(max_raster) - arcpy.sa.Float(min_raster))) * 100 | |
vci = arcpy.sa.Con(vci_unclipped > 100, 100, arcpy.sa.Con(vci_unclipped < 0, 0, vci_unclipped)) | |
# Before saving each output, create a temporary filename for intermediate saving | |
# Then, use arcpy.management.CopyRaster to apply LZ77 compression when saving the final output | |
for idx, calc_raster in enumerate([ratioanom, diffanom, stdanom, vci]): | |
# Determine the pixel type based on the index | |
if idx in [2, 3]: # Assuming stdanom and vci need floating-point precision | |
pixel_type = "32_BIT_FLOAT" | |
nodata_value = "-3.402823e+38" # Or "NaN", if appropriate and supported | |
else: # Use 16_BIT_SIGNED for other outputs | |
pixel_type = "16_BIT_SIGNED" | |
nodata_value = "-32768" # Common nodata value for 16_BIT_SIGNED | |
# Create a temporary filename for intermediate saving | |
temp_filename = f"temp_{uuid.uuid4().hex}.tif" | |
temp_file_path = os.path.join(arcpy.env.scratchFolder, temp_filename) | |
# Save the calculation result to the temporary file | |
calc_raster.save(temp_file_path) | |
# Replace direct saving with CopyRaster for LZ77 compression | |
arcpy.management.CopyRaster( | |
in_raster=temp_file_path, | |
out_rasterdataset=outputs[idx], | |
config_keyword="", | |
background_value="", | |
nodata_value=nodata_value, | |
onebit_to_eightbit="NONE", | |
colormap_to_RGB="NONE", | |
pixel_type=pixel_type, | |
scale_pixel_value="NONE", | |
RGB_to_Colormap="NONE", | |
format="TIFF", | |
transform="NONE", | |
process_as_multidimensional="CURRENT_SLICE", | |
build_multidimensional_transpose="NO_TRANSPOSE" | |
) | |
# Attempt to delete the temporary file | |
try: | |
arcpy.Delete_management(temp_file_path) | |
print(f"Temporary file {temp_file_path} deleted.") | |
except Exception as e: | |
print(f"Warning: Unable to delete temporary file {temp_file_path}") | |
# Clear any locks or residual files | |
arcpy.ClearWorkspaceCache_management() | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
arcpy.CheckInExtension("spatial") | |
print(f"{outputs[idx]} completed with LZ77 compression, calculated from {os.path.join(input_folder, raster)} and other statistics rasters.") | |
else: | |
print(f"Invalid year in filename: {raster}") | |
else: | |
print(f"Unexpected filename format: {raster}") | |
else: | |
print(f"Skipping non-TIFF file: {raster}") | |
# Main function | |
def main(): | |
# Call the derivative_vi() function for the input folder | |
derivative_vi(input_folder, stats_folder, ratioanom_folder, diffanom_folder, stdanom_folder, vci_folder) | |
if __name__ == '__main__': | |
main() | |
print("Script completed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment