Skip to content

Instantly share code, notes, and snippets.

@bennyistanto
Last active July 3, 2024 05:43
Show Gist options
  • Save bennyistanto/8b76aa7bc91f591a9153a09ec43f79a0 to your computer and use it in GitHub Desktop.
Save bennyistanto/8b76aa7bc91f591a9153a09ec43f79a0 to your computer and use it in GitHub Desktop.

LSEQM Bias Correction

Bias correction of satellite-based precipitation estimates is crucial in climate studies to enhance their accuracy and reliability. This exercise explores a method that combines Linear Scaling (LS) and Empirical Quantile Mapping (EQM) with tail adjustment using the Generalized Pareto Distribution (GPD). This hybrid approach aims to correct both the mean bias and the distributional discrepancies between satellite estimates and ground-based observations.

lseqm_xkcd_style_b

The following sections will describe the methodology, the role of different functions, their strengths, and weaknesses in achieving accurate bias correction.

1 Linear Scaling Approach

The Linear Scaling (LS) method is applied first to correct the mean bias in the IMERG precipitation data relative to the CPC precipitation data. The underlying theory for LS involves the calculation of a scale factor as the ratio of the observed mean to the modeled mean:

$[ \text{Scale Factor} = \frac{\mu_{\text{obs}}}{\mu_{\text{mod}}} ]$

where $( \mu_{\text{obs}} )$ is the mean precipitation from the CPC observations and $( \mu_{\text{mod}} )$ is the mean precipitation from the IMERG satellite data. This scale factor is then used to adjust the IMERG data:

$[ P_{\text{corrected}} = P_{\text{IMERG}} \times \text{Scale Factor} ]$

The strength of this approach lies in its simplicity and effectiveness in correcting systematic biases. However, it does not address distributional differences, particularly in the tails of the distribution where extreme values occur.

2 Empirical Quantile Mapping (EQM)

Empirical Quantile Mapping (EQM) is a comprehensive method used to align the distributions of satellite-based precipitation estimates with ground-based observations. By matching the empirical cumulative distribution functions (CDFs) of the two datasets, EQM corrects distributional biases across the entire range of precipitation values. This section explores the various components of EQM, including the application of gamma distribution-based quantile mapping and tail adjustment using the Generalized Pareto Distribution (GPD), to enhance the accuracy of bias correction.

2.1 Basic Empirical Quantile Mapping (EQM)

Empirical Quantile Mapping (EQM) works by transforming the precipitation values such that their quantiles match:

$[ Q_{\text{corrected}} = F_{\text{obs}}^{-1}(F_{\text{mod}}(P_{\text{IMERG}})) ]$

where $( F_{\text{obs}} )$ and $( F_{\text{mod}} )$ are the CDFs of the observed and modeled precipitation, respectively, and $( Q_{\text{corrected}} )$ represents the bias-corrected quantiles. The strength of EQM lies in its ability to correct distributional biases across the entire range of precipitation values. However, it might miss extreme values if they are not well represented in the observational data.

2.2 Gamma Distribution-Based Quantile Mapping

To further refine the bias correction, gamma distribution-based quantile mapping is applied. This involves fitting gamma distributions to both IMERG and CPC data using the method of moments, ensuring accurate representation of the distribution shapes. The gamma distribution is defined by its shape $(( k ))$, location $(( \theta ))$, and scale $(( \beta ))$ parameters:

$[ f(x; k, \theta, \beta) = \frac{(x-\theta)^{k-1} e^{-(x-\theta)/\beta}}{\beta^k \Gamma(k)} ]$

Fitting these parameters involves solving moment equations that relate the moments of the data to the parameters of the gamma distribution. The strength of this method is its mathematical rigor in fitting the entire distribution. However, it requires careful handling of parameter bounds to avoid infeasibility issues during optimization

2.3 Tail Adjustment with Generalized Pareto Distribution (GPD)

One critical enhancement in the approach is the adjustment for extreme values using GPD. This involves fitting a GPD to the excesses above a high threshold, defined as the 95th percentile in this study. The GPD is defined by its shape $(( \xi ))$, location $(( \mu ))$, and scale $(( \sigma ))$ parameters:

$[ f(x; \xi, \mu, \sigma) = \frac{1}{\sigma} \left(1 + \xi \frac{x-\mu}{\sigma}\right)^{-(1/\xi + 1)} ]$

This step is crucial for accurately capturing extreme precipitation events, which are often underrepresented in observational datasets. The main strength of this approach is its focus on tail behavior, improving the representation of extremes. However, the requirement for sufficient data points above the threshold can be a limitation in sparse datasets.

3 Moving Window Approach

The moving window approach is used to balance the advantages of capturing both spatial and temporal variations in precipitation patterns. By considering both spatial and temporal windows, the bias correction method can address local variations and seasonal changes more effectively.

3.1 Temporal Moving Windows

temporal_moving_window

Advantages:

  1. Seasonal and Short-term Variability:

    • Captures seasonal variations and short-term changes in precipitation patterns, which is essential for accurate bias correction over time.
    • Can adapt to the changing nature of weather patterns, providing more responsive adjustments.
  2. Stability:

    • Temporal aggregation can help in smoothing out short-term fluctuations and noise in the data.
    • This can be particularly useful if there are periods with sparse station data or missing values.
  3. Trend and Cycle Detection:

    • Helps in identifying trends and cycles in precipitation patterns that can be important for long-term studies and threshold definitions.

Disadvantages:

  1. Spatial Variability Ignored:
    • May not adequately account for spatial variations in precipitation, especially in regions with complex terrain or diverse weather patterns.

3.2 Spatial Moving Windows

spatial_moving_window

Advantages:

  1. Spatial Consistency:

    • Ensures spatial coherence and consistency in the data, accounting for local variations in precipitation patterns.
    • Provides localized corrections that adapt to spatial variability, leading to more accurate adjustments at each grid cell.
  2. Alignment with Resolution:

    • Aligns well with the spatial resolution differences between IMERG and CPC-UNI, ensuring appropriate scaling to match the higher resolution data.
  3. Improved Spatial Representations:

    • Helps maintain the spatial structure and patterns of precipitation, which can be critical for defining accurate thresholds for extreme rainfall events.

Disadvantages:

  1. Short-term Variability Ignored:
    • May not capture short-term temporal variations as effectively as a temporal moving window.
    • Can miss transient weather phenomena that occur over short periods.

4 Handling Extreme Values

The method incorporates both Empirical Quantile Mapping (EQM) and Generalized Pareto Distribution (GPD) fitting to address extreme values in precipitation data. Empirical Quantile Mapping is adept at matching the distribution of satellite-derived IMERG precipitation values to ground-based CPC observations by mapping the quantiles of the IMERG data to those of the CPC data. However, EQM can sometimes fail to capture extreme values accurately, especially if these extremes are not well-represented in the CPC data. To mitigate this, the method applies Generalized Pareto Distribution (GPD) fitting to the tails of the distribution. This involves identifying a threshold (typically set at the 95th percentile) and fitting a GPD to the excesses above this threshold. This approach enhances the ability to model and correct for extreme precipitation events, ensuring that the corrected dataset retains the significant variability and magnitude of these extremes.

4.1 Strengths:

  • Improved Extreme Value Representation: By using GPD fitting for the tails of the distribution, the method ensures that extreme precipitation values are better captured and represented.
  • Distribution Matching: EQM effectively matches the overall distribution of satellite and ground-based data, improving the reliability of the corrected dataset.
  • Dynamic Adjustment: The method dynamically adjusts the tails of the distribution, which is particularly useful for regions with sparse observational data.

4.2 Weaknesses:

  • Complexity: The combined approach of EQM and GPD fitting adds complexity to the bias correction process, requiring careful implementation and validation.
  • Dependence on Threshold Selection: The accuracy of the GPD fitting is sensitive to the chosen threshold, which may require empirical tuning for different datasets or regions.
  • Computational Overhead: The process of fitting GPD and performing quantile mapping can be computationally intensive, especially for large datasets or high-resolution models.

5 Combined Approach and Implementation

Given the trade-offs and specific goals, a hybrid approach combining both spatial and temporal windows can leverage the advantages of both methods. This hybrid approach balances spatial consistency with temporal responsiveness, addressing both local variations and short-term changes.

5.1 Suggested Hybrid Approach:

  1. Spatial Window: Use a 5x5 spatial window to ensure spatial coherence and consistency, adjusting for local variations.
  2. Temporal Window: Apply a shorter temporal window (e.g., 5 days) within the spatial window to capture short-term temporal variations and seasonal patterns.

5.2 Implementation:

  1. Spatial Window Calculation:

    • For each grid cell, consider a 5x5 spatial window around the target cell.
    • Aggregate the precipitation values within this window.
  2. Temporal Window Calculation:

    • Within each spatial window, consider a temporal window (e.g., 5 days).
    • Aggregate the precipitation values over this period.
  3. Bias Correction:

    • Calculate the scale factor and apply linear scaling based on the combined spatial and temporal window values.
    • Perform empirical quantile mapping using the combined values.

5.3 Strengths of the Combined Approach

  • Comprehensive Correction: By combining different methods, the approach corrects both mean and distributional biases.
  • Extreme Value Preservation: Tail adjustment with GPD ensures that extreme values, critical for understanding climate impacts, are preserved.
  • Mathematical Rigor: The use of gamma distribution fitting and GPD adds mathematical robustness to the correction process.
  • **Improved Extreme Value

Representation**: By using GPD fitting for the tails of the distribution, the method ensures that extreme precipitation values are better captured and represented.

  • Distribution Matching: EQM effectively matches the overall distribution of satellite and ground-based data, improving the reliability of the corrected dataset.
  • Dynamic Adjustment: The method dynamically adjusts the tails of the distribution, which is particularly useful for regions with sparse observational data.

5.4 Weaknesses and Challenges

  • Data Requirements: The need for sufficient data points above the threshold for GPD fitting can be challenging in regions with sparse observations.
  • Computational Complexity: The combined approach is computationally intensive, requiring significant processing time and resources.
  • Parameter Sensitivity: The fitting processes for gamma and GPD distributions are sensitive to parameter bounds and initial guesses, requiring careful tuning to avoid convergence issues.

This hybrid approach leverages the strengths of both spatial and temporal windows to provide a more robust and accurate bias correction, ensuring that both moderate and extreme precipitation values are accurately corrected, preserving the physical characteristics of the precipitation data and providing a robust basis for defining precise thresholds for extreme rainfall events.


This exercise demonstrates a robust method for bias correction of satellite-based precipitation estimates, combining Linear Scaling, Empirical Quantile Mapping, and Generalized Pareto Distribution. While the approach is computationally intensive and data-sensitive, it offers a comprehensive solution for correcting both moderate and extreme precipitation values. Future work can focus on optimizing computational efficiency and exploring alternative methods for regions with limited observational data.

Notes

Work in progress

Contact:

Benny Istanto, GISP12

Footnotes

  1. Applied Climatology Study Program, Faculty of Mathematics and Natural Sciences, Bogor Agricultural University

  2. Geospatial Operation Support Team, Development Economic Data Group, The World Bank

# Import the library
import os
import numpy as np
import xarray as xr
import pandas as pd
import calendar
from scipy import optimize
from scipy.stats import gamma, genpareto
from sklearn.model_selection import KFold
from multiprocessing import Pool
import time
# Main directory on Google Drive
dir = f'/mnt/d/temp/gfm1609'
# Define the appropriate input and output directory paths
input_dir = f'{dir}/data/bc/input'
output_dir = f'{dir}/data/bc/output'
imerg_path = f'{input_dir}/imergl'
cpc_path = f'{input_dir}/cpcuni'
mask_path = f'{dir}/data/subset/iso3'
corrected_precip_path = f'{output_dir}/spatialwindow'
# List of output directories
output_directories = [output_dir, corrected_precip_path]
# Create the output directories if they don't already exist
for directory in output_directories:
os.makedirs(directory, exist_ok=True)
# Global variable to store user's choice
user_choice = None
def set_user_decision():
"""Prompt user for decision on existing files and store it globally."""
global user_choice
if user_choice is None:
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
while decision not in ['R', 'S', 'A']:
print("Invalid choice. Please choose again.")
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
user_choice = decision
# To address the issue of capturing extreme values in satellite data while performing EQM, Tail Adjustment
# is use to improve fit for extreme value. Tail adjustment with the Generalized Pareto Distribution (GPD)
# better captures the extreme values by specifically modeling the tails of the distribution, which is
# crucial for accurately representing extreme precipitation events.
def gamma_quantile_mapping(imerg_values, cpc_values):
"""
Apply gamma distribution-based quantile mapping with improved fitting and tail adjustment
to correct the distribution of precipitation data.
This function fits gamma distributions to the IMERG and CPC precipitation values using
regularization and stricter constraints to improve the fitting accuracy. It then computes
the cumulative distribution function (CDF) of the IMERG values and applies the inverse CDF
of the CPC values to obtain the corrected precipitation values. Additionally, it adjusts
the tails using the Generalized Pareto Distribution (GPD) to better capture extreme values.
Parameters:
imerg_values (numpy.ndarray): Array of IMERG precipitation values.
cpc_values (numpy.ndarray): Array of CPC precipitation values.
Returns:
numpy.ndarray: Corrected precipitation values after gamma quantile mapping with improved
fitting and tail adjustment.
"""
def moment_equations(data, params):
"""
Define moment equations for fitting gamma distribution using moments.
Parameters:
data (numpy.ndarray): Array of data values.
params (tuple): Parameters (shape, loc, scale) for the gamma distribution.
Returns:
tuple: Differences between the estimated and actual moments.
"""
shape, loc, scale = params
if abs(shape) < 1e-8:
# Special case when shape is close to zero
return (loc + scale * np.euler_gamma - np.mean(data),
scale**2 * np.pi**2 / 6 - np.var(data),
0)
else:
return (loc + scale * (1 - np.euler_gamma * shape) / shape - np.mean(data),
scale**2 * (1 - 2 * shape * np.euler_gamma + shape**2 * np.pi**2 / 6) / shape**2 - np.var(data),
np.sign(shape) * (3 * np.euler_gamma * shape - 1 - shape**3 * np.pi**2 / 6) / (1 - 2 * shape) - np.sign(np.mean((data - np.mean(data))**3)))
def fit_gamma_with_moment_equations(data):
"""
Fit a gamma distribution to the data using moment equations.
Parameters:
data (numpy.ndarray): Array of data values.
Returns:
tuple: Fitted parameters (shape, loc, scale) of the gamma distribution.
"""
data = data[~np.isnan(data)] # Remove NaN values from data
if len(data) == 0: # If no data left after removing NaNs, return default values
return 1, 0, 1
initial_guess = [0.1, np.mean(data), np.std(data)]
lower_bounds = [0.001, np.min(data), 0.001]
upper_bounds = [10, np.max(data), np.std(data) * 10]
# Ensure that each lower bound is strictly less than the corresponding upper bound
for i in range(3):
if lower_bounds[i] >= upper_bounds[i]:
upper_bounds[i] = lower_bounds[i] + 1e-6
# Ensure initial guess is within bounds
initial_guess = np.clip(initial_guess, lower_bounds, upper_bounds)
bounds = (lower_bounds, upper_bounds)
def objective(params):
return moment_equations(data, params)
try:
result = optimize.least_squares(objective, initial_guess, bounds=bounds, max_nfev=10000)
shape, loc, scale = result.x
except ValueError as e:
print(f"ValueError: {e}")
print(f"Initial guess: {initial_guess}")
print(f"Bounds: {bounds}")
raise
# Regularization and constraints
shape = max(shape, 0.001) # Ensure shape is positive
scale = max(scale, 0.001) # Ensure scale is positive
return shape, loc, scale
def fit_generalized_pareto_distribution(data, threshold):
"""
Fit a Generalized Pareto Distribution (GPD) to the excesses above the threshold.
Parameters:
data (numpy.ndarray): Array of data values.
threshold (float): Threshold value for defining the excesses.
Returns:
tuple: Fitted parameters of the GPD.
"""
excesses = data[data > threshold] - threshold
if len(excesses) < 10: # Arbitrary minimum number of points for GPD fitting
#print("Not enough excesses for GPD fitting. Using default parameters.")
return (0, 0, 1) # Return a default GPD with zero shape and unit scale
params = genpareto.fit(excesses)
return params
def cross_validate_gpd(data, threshold, n_splits=5):
"""
Cross-validate GPD fitting by splitting data into folds.
Parameters:
data (numpy.ndarray): Array of data values.
threshold (float): Threshold value for defining the excesses.
n_splits (int, optional): Number of cross-validation splits. Default is 5.
Returns:
tuple: Averaged parameters of the GPD from cross-validation.
"""
excesses = data[data > threshold] - threshold
if len(excesses) < n_splits:
#print("Not enough excesses for cross-validation. Fitting GPD directly.")
return fit_generalized_pareto_distribution(data, threshold) # Fit GPD directly if not enough samples for cross-validation
kf = KFold(n_splits=n_splits)
params_list = []
for train_index, test_index in kf.split(excesses):
train_data, test_data = excesses[train_index], excesses[test_index]
params = genpareto.fit(train_data)
params_list.append(params)
# Average the parameters from cross-validation
shape_avg = np.mean([params[0] for params in params_list])
loc_avg = np.mean([params[1] for params in params_list])
scale_avg = np.mean([params[2] for params in params_list])
return shape_avg, loc_avg, scale_avg
# Combine the values within the spatial window
imerg_values_flat = imerg_values.flatten()
cpc_values_flat = cpc_values.flatten()
if imerg_values_flat.size == 0 or cpc_values_flat.size == 0:
return np.full(imerg_values.shape, np.nan)
#print("Fitting gamma distributions to IMERG values...")
shape1, loc1, scale1 = fit_gamma_with_moment_equations(imerg_values_flat)
#print(f"IMERG Gamma Params: shape={shape1}, loc={loc1}, scale={scale1}")
y = gamma.cdf(imerg_values_flat, shape1, loc=loc1, scale=scale1)
#print(f"Gamma CDF of IMERG values: min={np.nanmin(y)}, max={np.nanmax(y)}")
#print("Fitting gamma distributions to CPC values...")
shape2, loc2, scale2 = fit_gamma_with_moment_equations(cpc_values_flat)
#print(f"CPC Gamma Params: shape={shape2}, loc={loc2}, scale={scale2}")
cpc_quantiles_flat = gamma.ppf(y, shape2, loc=loc2, scale=scale2)
#print(f"Gamma PPF of CPC values: min={np.nanmin(cpc_quantiles_flat)}, max={np.nanmax(cpc_quantiles_flat)}")
# Ensure CPC quantiles are within realistic bounds
cpc_quantiles_flat = np.maximum(cpc_quantiles_flat, 0)
#print(f"Adjusted CPC quantiles: min={np.nanmin(cpc_quantiles_flat)}, max={np.nanmax(cpc_quantiles_flat)}")
#print("Fitting Generalized Pareto Distribution (GPD) to CPC values...")
try:
threshold = np.percentile(cpc_values_flat, 95) # Set threshold at 95th percentile
except IndexError as e:
print(f"IndexError: {e}")
threshold = np.nan
if np.isnan(threshold):
print("Threshold is NaN, skipping GPD fitting.")
else:
#print(f"GPD threshold: {threshold}")
cpc_gpd_params = cross_validate_gpd(cpc_values_flat, threshold)
#print(f"GPD Params: {cpc_gpd_params}")
# Adjust the tails using GPD if there are enough data points
extreme_mask = imerg_values_flat > threshold
if np.any(extreme_mask):
cpc_quantiles_flat[extreme_mask] = genpareto.ppf(y[extreme_mask], *cpc_gpd_params) + threshold
#print(f"Adjusted CPC quantiles after GPD: min={np.nanmin(cpc_quantiles_flat)}, max={np.nanmax(cpc_quantiles_flat)}")
#print("Applying dynamic cap to CPC quantiles...")
try:
dynamic_cap = np.percentile(cpc_values_flat, 99.9) # Cap at the 99.9th percentile
except IndexError as e:
#print(f"IndexError: {e}")
dynamic_cap = np.nan
print(f"Dynamic cap: {dynamic_cap}")
if np.isnan(dynamic_cap):
#print("Dynamic cap is NaN, skipping dynamic capping.")
return cpc_quantiles_flat.reshape(imerg_values.shape)
cpc_quantiles_flat = np.minimum(cpc_quantiles_flat, dynamic_cap)
# Ensure non-negative corrected values
cpc_quantiles_flat = np.maximum(cpc_quantiles_flat, 0)
#print("Reshaping CPC quantiles to original shape...")
return cpc_quantiles_flat.reshape(imerg_values.shape)
def calculate_scale_factor(imerg_precip, cpc_precip):
"""
Calculate the scale factor using a 5x5 spatial window.
Parameters:
imerg_precip (numpy.ndarray): IMERG precipitation values.
cpc_precip (numpy.ndarray): CPC precipitation values.
Returns:
numpy.ndarray: Scale factors for each grid point.
"""
scale_factors = np.full_like(cpc_precip, np.nan)
time_steps, rows, cols = cpc_precip.shape
for t in range(time_steps):
for i in range(rows):
for j in range(cols):
# Define the window boundaries
row_start = max(i - 2, 0)
row_end = min(i + 3, rows)
col_start = max(j - 2, 0)
col_end = min(j + 3, cols)
imerg_window = imerg_precip[t, row_start:row_end, col_start:col_end]
cpc_window = cpc_precip[t, row_start:row_end, col_start:col_end]
if not np.isnan(cpc_precip[t, i, j]) and np.count_nonzero(~np.isnan(imerg_window)) > 0:
regional_mean_imerg = np.nanmean(imerg_window)
if regional_mean_imerg != 0:
scale_factors[t, i, j] = cpc_precip[t, i, j] / regional_mean_imerg
else:
scale_factors[t, i, j] = 1.0
else:
scale_factors[t, i, j] = np.nan
return scale_factors
def apply_linear_scaling(imerg_precip, scale_factors):
"""
Apply the scale factors to IMERG precipitation data.
Parameters:
imerg_precip (numpy.ndarray): IMERG precipitation values.
scale_factors (numpy.ndarray): Scale factors for each grid point.
Returns:
numpy.ndarray: Spatially calibrated IMERG precipitation values.
"""
return imerg_precip * scale_factors
def lseqm_spatial(imerg_ds, cpc_ds):
"""
Apply the Linear Scaling and Empirical Quantile Mapping (LSEQM) method with a spatial moving window approach.
This function performs bias correction on precipitation data by combining Linear Scaling (LS)
and Empirical Quantile Mapping (EQM). The spatial moving window approach is used to smooth local
variability, mitigate random errors, and capture larger-scale patterns in the data.
Parameters:
imerg_ds (xarray.Dataset): IMERG precipitation dataset with dimensions ('time', 'lat', 'lon').
cpc_ds (xarray.Dataset): CPC precipitation dataset with dimensions ('time', 'lat', 'lon').
Returns:
xarray.Dataset: The bias-corrected precipitation dataset with the same dimensions as the input datasets.
"""
# Sort the input datasets by the time dimension
imerg_ds = imerg_ds.sortby('time')
cpc_ds = cpc_ds.sortby('time')
# Get the precipitation data from the input datasets
imerg_precip = imerg_ds['precipitation']
cpc_precip = cpc_ds['precip']
print(f"IMERG precip shape: {imerg_precip.shape}")
print(f"CPC precip shape: {cpc_precip.shape}")
# Calculate the scale factors using a 5x5 spatial moving window
scale_factors = calculate_scale_factor(imerg_precip.values, cpc_precip.values)
#print("Scale factors calculated. Checking for NaN values...")
if np.all(np.isnan(scale_factors)):
#print("All scale factors are NaN. Something went wrong in calculate_scale_factor.")
return xr.Dataset(data_vars={'precip': (('time', 'lat', 'lon'), np.full(imerg_precip.shape, np.nan))},
coords={'time': imerg_ds['time'], 'lat': imerg_ds['lat'], 'lon': imerg_ds['lon']})
# Apply Linear Scaling (LS) to correct the mean bias
print("Performing Linear Scaling (LS) to correct the mean bias...")
ls_corrected_precip = apply_linear_scaling(imerg_precip.values, scale_factors)
#print("Linear Scaling completed. Checking for NaN values in LS corrected precip...")
if np.all(np.isnan(ls_corrected_precip)):
#print("All LS corrected precip values are NaN. Something went wrong in apply_linear_scaling.")
return xr.Dataset(data_vars={'precip': (('time', 'lat', 'lon'), np.full(imerg_precip.shape, np.nan))},
coords={'time': imerg_ds['time'], 'lat': imerg_ds['lat'], 'lon': imerg_ds['lon']})
# Apply gamma distribution-based quantile mapping to correct the distribution
print("Applying gamma distribution-based quantile mapping to correct the distribution...")
corrected_precip = xr.apply_ufunc(gamma_quantile_mapping, ls_corrected_precip, cpc_precip.values,
input_core_dims=[['time', 'lat', 'lon'], ['time', 'lat', 'lon']],
output_core_dims=[['time', 'lat', 'lon']],
output_dtypes=[imerg_precip.dtype],
vectorize=True).data
#print("Quantile mapping completed. Checking for NaN values in corrected precip...")
if np.all(np.isnan(corrected_precip)):
#print("All corrected precip values are NaN. Something went wrong in gamma_quantile_mapping.")
return xr.Dataset(data_vars={'precip': (('time', 'lat', 'lon'), np.full(imerg_precip.shape, np.nan))},
coords={'time': imerg_ds['time'], 'lat': imerg_ds['lat'], 'lon': imerg_ds['lon']})
# Ensure non-negative precipitation values
corrected_precip = np.maximum(corrected_precip, 0)
# Create a new xarray.Dataset with the bias-corrected data
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)},
coords={'time': imerg_ds['time'],
'lat': imerg_ds['lat'],
'lon': imerg_ds['lon']},
attrs={'cdm_data_type': 'GRID',
'title': 'Bias Corrected IMERG Late Precipitation L3 1 day 0.1 degree x 0.1 degree',
'summary': 'Precipitation data corrected using Linear Scaling and Empirical Quantile Mapping',
'source': 'IMERG and CPC-UNI',
'history': f'Created on {pd.Timestamp.now()}',
'DOI': '10.5067/GPM/IMERGDF/DAY/07',
'creator_name': 'Benny Istanto',
'creator_role': 'Climate Geographer',
'creator_email': '[email protected]',
'comment': 'This dataset has been bias corrected using a spatial moving window approach'})
corrected_ds['precipitation'].attrs.update({
'units': 'mm',
'long_name': 'Corrected daily mean precipitation rate estimate',
'standard_name': 'corrected_precipitation'})
corrected_ds['lat'].attrs.update({
'units': 'degrees_north',
'long_name': 'Latitude'
})
corrected_ds['lon'].attrs.update({
'units': 'degrees_east',
'long_name': 'Longitude'
})
print("Bias correction completed for the entire dataset.")
return corrected_ds
def process_dekad(imerg_ds, cpc_ds, dekad_start, dekad_end, land_sea_mask):
"""
Process bias correction for a single dekad period.
Parameters:
imerg_ds (xarray.Dataset): IMERG precipitation dataset.
cpc_ds (xarray.Dataset): CPC precipitation dataset.
dekad_start (pd.Timestamp): Start date of the dekad.
dekad_end (pd.Timestamp): End date of the dekad.
land_sea_mask (xarray.DataArray): Land-sea mask.
Returns:
str: Path to the saved corrected precipitation file.
"""
print(f"Processing dekad from {dekad_start} to {dekad_end}...")
imerg_window = imerg_ds.sel(time=slice(dekad_start, dekad_end))
cpc_window = cpc_ds.sel(time=slice(dekad_start, dekad_end))
if not imerg_window.time.size or not cpc_window.time.size:
print(f"No data available for dekad {dekad_start} to {dekad_end}. Skipping...")
return None
cpc_window = cpc_window.reindex_like(imerg_window, method='nearest')
print("Starting bias correction for dekad...")
corrected_ds = lseqm_spatial(imerg_window, cpc_window)
print("Bias correction completed for dekad...")
corrected_dekad = corrected_ds.sel(time=slice(dekad_start, dekad_end))
land_sea_mask_interp = land_sea_mask.interp(lat=corrected_dekad.lat, lon=corrected_dekad.lon, method="nearest")
corrected_dekad_masked = corrected_dekad.where(land_sea_mask_interp == 1, drop=True)
corrected_output_file = f'{corrected_precip_path}/idn_corrected_sw_imergl_{dekad_start.strftime("%Y%m%d")}.nc'
if os.path.exists(corrected_output_file):
set_user_decision()
if user_choice == 'S':
print(f"Skipping file {corrected_output_file}")
return None
elif user_choice == 'A':
print("Aborting process.")
return
print(f"Saving corrected precipitation for {dekad_start} to {dekad_end} to {corrected_output_file}...")
cf18 = {'precipitation': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan}}
corrected_dekad_masked.to_netcdf(corrected_output_file, encoding=cf18, engine='netcdf4')
print(f"Saved corrected precipitation for {dekad_start} to {dekad_end}.")
return corrected_output_file
def process_year(year):
"""
Process bias correction for a single year.
Parameters:
year (int): Year to process.
"""
try:
imerg_ds = xr.open_dataset(f'{imerg_path}/idn_imergl_{year}.nc4', decode_times=True)
cpc_ds = xr.open_dataset(f'{cpc_path}/idn_cpc_{year}.nc4', decode_times=True)
except FileNotFoundError as e:
print(f"Error: {e}")
return
mask_ds = xr.open_dataset(f'{mask_path}/idn_subset.nc')
land_sea_mask = mask_ds['land']
for month in range(1, 13):
_, days_in_month = calendar.monthrange(year, month)
dekad_dates = [pd.Timestamp(year, month, 1), pd.Timestamp(year, month, 11), pd.Timestamp(year, month, 21)]
days_in_dekads = [10, 10, days_in_month - 20]
if month == 2 and calendar.isleap(year):
days_in_dekads[2] = 9
for i, dekad_start in enumerate(dekad_dates):
dekad_end = dekad_start + pd.Timedelta(days=days_in_dekads[i] - 1)
process_dekad(imerg_ds, cpc_ds, dekad_start, dekad_end, land_sea_mask)
def main(start_year, end_year):
"""
The main process to calculate:
- bias correction
- save corrected data to netcdf
Returns:
- Output available in folder output/corrected_precip
"""
start_time = time.time()
years = list(range(start_year, end_year + 1))
with Pool() as pool:
pool.map(process_year, years)
print(f"Total processing time: {time.time() - start_time:.2f} seconds.")
if __name__ == '__main__':
start_year = int(input("Enter the start year: "))
end_year = int(input("Enter the end year: "))
main(start_year, end_year)
# Import the library
import os
import numpy as np
import xarray as xr
import pandas as pd
import calendar
from scipy import optimize
from scipy.stats import gamma, genpareto
from sklearn.model_selection import KFold
import time
# Main directory on Google Drive
dir = f'/mnt/d/temp/gfm1609'
# Define the appropriate input and output directory paths
input_dir = f'{dir}/data/bc/input'
output_dir = f'{dir}/data/bc/output'
imerg_path = f'{input_dir}/imergl'
cpc_path = f'{input_dir}/cpcuni'
mask_path = f'{dir}/data/subset/iso3'
corrected_precip_path = f'{output_dir}/temporalwindow'
# List of output directories
output_directories = [output_dir, corrected_precip_path]
# Create the output directories if they don't already exist
for directory in output_directories:
os.makedirs(directory, exist_ok=True)
# Global variable to store user's choice
user_choice = None
def set_user_decision():
"""Prompt user for decision on existing files and store it globally."""
global user_choice
if user_choice is None:
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
while decision not in ['R', 'S', 'A']:
print("Invalid choice. Please choose again.")
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
user_choice = decision
# To address the issue of capturing extreme values in satellite data while performing EQM, Tail Adjustment
# is use to improve fit for extreme value. Tail adjustment with the Generalized Pareto Distribution (GPD)
# better captures the extreme values by specifically modeling the tails of the distribution, which is
# crucial for accurately representing extreme precipitation events.
def gamma_quantile_mapping(imerg_values, cpc_values):
"""
Apply gamma distribution-based quantile mapping with improved fitting and tail adjustment to correct the distribution of precipitation data.
This function fits gamma distributions to the IMERG and CPC precipitation values using regularization and stricter constraints
to improve the fitting accuracy. It then computes the cumulative distribution function (CDF) of the IMERG values and applies
the inverse CDF of the CPC values to obtain the corrected precipitation values. Additionally, it adjusts the tails using the
Generalized Pareto Distribution (GPD) to better capture extreme values.
Parameters:
imerg_values (numpy.ndarray): Array of IMERG precipitation values.
cpc_values (numpy.ndarray): Array of CPC precipitation values.
Returns:
numpy.ndarray: Corrected precipitation values after gamma quantile mapping with improved fitting and tail adjustment.
"""
def moment_equations(data, params):
"""
Define moment equations for fitting gamma distribution using moments.
Parameters:
data (numpy.ndarray): Array of data values.
params (tuple): Parameters (shape, loc, scale) for the gamma distribution.
Returns:
tuple: Differences between the estimated and actual moments.
"""
shape, loc, scale = params
if abs(shape) < 1e-8:
# Special case when shape is close to zero
return (loc + scale * np.euler_gamma - np.mean(data),
scale**2 * np.pi**2 / 6 - np.var(data),
0)
else:
return (loc + scale * (1 - np.euler_gamma * shape) / shape - np.mean(data),
scale**2 * (1 - 2 * shape * np.euler_gamma + shape**2 * np.pi**2 / 6) / shape**2 - np.var(data),
np.sign(shape) * (3 * np.euler_gamma * shape - 1 - shape**3 * np.pi**2 / 6) / (1 - 2 * shape) - np.sign(np.mean((data - np.mean(data))**3)))
def fit_gamma_with_moment_equations(data):
"""
Fit a gamma distribution to the data using moment equations.
Parameters:
data (numpy.ndarray): Array of data values.
Returns:
tuple: Fitted parameters (shape, loc, scale) of the gamma distribution.
"""
data = data[~np.isnan(data)] # Remove NaN values from data
if len(data) == 0: # If no data left after removing NaNs, return default values
return 1, 0, 1
initial_guess = [0.1, np.mean(data), np.std(data)]
lower_bounds = [0.001, np.min(data), 0.001]
upper_bounds = [10, np.max(data), np.std(data) * 10]
# Ensure that each lower bound is strictly less than the corresponding upper bound
for i in range(3):
if lower_bounds[i] >= upper_bounds[i]:
upper_bounds[i] = lower_bounds[i] + 1e-6
# Ensure initial guess is within bounds
initial_guess = np.clip(initial_guess, lower_bounds, upper_bounds)
bounds = (lower_bounds, upper_bounds)
def objective(params):
return moment_equations(data, params)
try:
result = optimize.least_squares(objective, initial_guess, bounds=bounds, max_nfev=10000)
shape, loc, scale = result.x
except ValueError as e:
print(f"ValueError: {e}")
print(f"Initial guess: {initial_guess}")
print(f"Bounds: {bounds}")
raise
# Regularization and constraints
shape = max(shape, 0.001) # Ensure shape is positive
scale = max(scale, 0.001) # Ensure scale is positive
return shape, loc, scale
def fit_generalized_pareto_distribution(data, threshold):
"""
Fit a Generalized Pareto Distribution (GPD) to the excesses above the threshold.
Parameters:
data (numpy.ndarray): Array of data values.
threshold (float): Threshold value for defining the excesses.
Returns:
tuple: Fitted parameters of the GPD.
"""
excesses = data[data > threshold] - threshold
if len(excesses) < 10: # Arbitrary minimum number of points for GPD fitting
#print("Not enough excesses for GPD fitting. Using default parameters.")
return (0, 0, 1) # Return a default GPD with zero shape and unit scale
params = genpareto.fit(excesses)
return params
def cross_validate_gpd(data, threshold, n_splits=5):
"""
Cross-validate GPD fitting by splitting data into folds.
Parameters:
data (numpy.ndarray): Array of data values.
threshold (float): Threshold value for defining the excesses.
n_splits (int, optional): Number of cross-validation splits. Default is 5.
Returns:
tuple: Averaged parameters of the GPD from cross-validation.
"""
excesses = data[data > threshold] - threshold
if len(excesses) < n_splits:
#print("Not enough excesses for cross-validation. Fitting GPD directly.")
return fit_generalized_pareto_distribution(data, threshold) # Fit GPD directly if not enough samples for cross-validation
kf = KFold(n_splits=n_splits)
params_list = []
for train_index, test_index in kf.split(excesses):
train_data, test_data = excesses[train_index], excesses[test_index]
params = genpareto.fit(train_data)
params_list.append(params)
# Average the parameters from cross-validation
shape_avg = np.mean([params[0] for params in params_list])
loc_avg = np.mean([params[1] for params in params_list])
scale_avg = np.mean([params[2] for params in params_list])
return shape_avg, loc_avg, scale_avg
# Remove NaN values from the input arrays
original_shape = imerg_values.shape
imerg_values = imerg_values[~np.isnan(imerg_values)]
cpc_values = cpc_values[~np.isnan(cpc_values)]
if imerg_values.size == 0 or cpc_values.size == 0:
return np.full(original_shape, np.nan)
# Fit gamma distributions to the IMERG and CPC values using moment equations
#print("Fitting gamma distributions to IMERG values...")
shape1, loc1, scale1 = fit_gamma_with_moment_equations(imerg_values)
#print(f"IMERG Gamma Params: shape={shape1}, loc={loc1}, scale={scale1}")
y = gamma.cdf(imerg_values, shape1, loc=loc1, scale=scale1)
#print(f"Gamma CDF of IMERG values: min={np.nanmin(y)}, max={np.nanmax(y)}")
#print("Fitting gamma distributions to CPC values...")
shape2, loc2, scale2 = fit_gamma_with_moment_equations(cpc_values)
#print(f"CPC Gamma Params: shape={shape2}, loc={loc2}, scale={scale2}")
cpc_quantiles = gamma.ppf(y, shape2, loc=loc2, scale=scale2)
#print(f"Gamma PPF of CPC values: min={np.nanmin(cpc_quantiles)}, max={np.nanmax(cpc_quantiles)}")
# Ensure CPC quantiles are within realistic bounds
cpc_quantiles = np.maximum(cpc_quantiles, 0)
#print(f"Adjusted CPC quantiles: min={np.nanmin(cpc_quantiles)}, max={np.nanmax(cpc_quantiles)}")
# Fit GPD to the tails of the CPC values with cross-validation
#print("Fitting Generalized Pareto Distribution (GPD) to CPC values...")
threshold = np.percentile(cpc_values, 95) # Set threshold at 95th percentile
if np.isnan(threshold):
print("Threshold is NaN, skipping GPD fitting.")
else:
#print(f"GPD threshold: {threshold}")
cpc_gpd_params = cross_validate_gpd(cpc_values, threshold)
#print(f"GPD Params: {cpc_gpd_params}")
# Adjust the tails using GPD if there are enough data points
extreme_mask = imerg_values > threshold
if np.any(extreme_mask):
cpc_quantiles[extreme_mask] = genpareto.ppf(y[extreme_mask], *cpc_gpd_params) + threshold
#print(f"Adjusted CPC quantiles after GPD: min={np.nanmin(cpc_quantiles)}, max={np.nanmax(cpc_quantiles)}")
# Dynamically determine an upper cap based on the characteristics of the data
#print("Applying dynamic cap to CPC quantiles...")
dynamic_cap = np.percentile(cpc_values, 99.9) # Cap at the 99.9th percentile
#print(f"Dynamic cap: {dynamic_cap}")
if np.isnan(dynamic_cap):
#print("Dynamic cap is NaN, skipping dynamic capping.")
return cpc_quantiles.reshape(imerg_values.shape)
cpc_quantiles = np.minimum(cpc_quantiles, dynamic_cap)
# Ensure non-negative corrected values
cpc_quantiles = np.maximum(cpc_quantiles, 0)
#print("Reshaping CPC quantiles to original shape...")
return cpc_quantiles.reshape(original_shape)
def moving_window_mean(arr, window_size):
"""
Calculate a moving window mean for a given array.
The moving window approach is used to smooth local variability in precipitation data,
mitigate random errors, and provide a robust basis for bias correction. Even when
both IMERG and CPC datasets have the different/same spatial resolution (0.1° or 0.5°),
the moving window helps to capture larger-scale patterns and reduce noise.
Parameters:
arr (numpy.ndarray): The input array to be smoothed.
window_size (int): The size of the moving window.
Returns:
numpy.ndarray: The smoothed array after applying the moving window mean.
"""
padding = (window_size - 1) // 2
padded_arr = np.pad(arr, ((0, 0), (padding, padding), (padding, padding)), mode='reflect')
result = np.empty_like(arr)
for i in range(arr.shape[0]):
for j in range(arr.shape[1]):
for k in range(arr.shape[2]):
window = padded_arr[i, j-padding:j+padding+1, k-padding:k+padding+1]
if np.all(np.isnan(window)):
result[i, j, k] = np.nan
else:
result[i, j, k] = np.nanmean(window)
return result
def lseqm_moving_window(imerg_ds, cpc_ds, window_size=5):
"""
Apply the Linear Scaling and Empirical Quantile Mapping (LSEQM) method with a moving window approach.
This function performs bias correction on precipitation data by combining Linear Scaling (LS)
and Empirical Quantile Mapping (EQM). The moving window approach is used to smooth local
variability, mitigate random errors, and capture larger-scale patterns in the data.
Parameters:
imerg_ds (xarray.Dataset): IMERG precipitation dataset with dimensions ('time', 'lat', 'lon').
cpc_ds (xarray.Dataset): CPC precipitation dataset with dimensions ('time', 'lat', 'lon').
window_size (int, optional): The size of the moving window used for smoothing. Default is 5.
Returns:
xarray.Dataset: The bias-corrected precipitation dataset with the same dimensions as the input datasets.
Notes:
- The IMERG and CPC datasets are first sorted by time.
- The Linear Scaling (LS) method corrects the mean bias by applying a scale factor calculated
from the ratio of moving window means of CPC and IMERG precipitation.
- The Empirical Quantile Mapping (EQM) method is applied using gamma distribution-based quantile mapping
to correct the distribution of the precipitation data.
- The corrected precipitation data is stored in a new xarray.Dataset.
Example:
corrected_ds = lseqm_moving_window(imerg_ds, cpc_ds, window_size=5)
"""
# Sort the input datasets by the time dimension
imerg_ds = imerg_ds.sortby('time')
cpc_ds = cpc_ds.sortby('time')
# Get the precipitation data from the input datasets
imerg_precip = imerg_ds['precipitation']
cpc_precip = cpc_ds['precip']
print("IMERG precip shape:", imerg_precip.shape)
print("CPC precip shape:", cpc_precip.shape)
# Calculate the moving window mean for each dataset
imerg_moving_mean = moving_window_mean(imerg_precip.values, window_size)
cpc_moving_mean = moving_window_mean(cpc_precip.values, window_size)
# Perform Linear Scaling (LS) to correct the mean bias
print("Perform Linear Scaling (LS) to correct the mean bias...")
ls_scale_factor = np.divide(cpc_moving_mean, imerg_moving_mean, out=np.ones_like(cpc_moving_mean), where=imerg_moving_mean != 0)
ls_corrected_precip = imerg_precip * ls_scale_factor
print("Apply gamma distribution-based quantile mapping to correct the distribution...")
corrected_precip = xr.apply_ufunc(gamma_quantile_mapping, ls_corrected_precip, cpc_precip, input_core_dims=[['time'], ['time']], output_core_dims=[['time']], output_dtypes=[imerg_precip.dtype], vectorize=True).data
corrected_precip = np.transpose(corrected_precip, (2, 0, 1))
# Ensure non-negative precipitation values
corrected_precip = np.maximum(corrected_precip, 0)
print("Corrected precip shape:", corrected_precip.shape)
# Create a new xarray.Dataset with the bias-corrected data
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)},
coords={'time': imerg_ds['time'],
'lat': imerg_ds['lat'],
'lon': imerg_ds['lon']},
attrs={'cdm_data_type': 'GRID',
'title': 'Bias Corrected IMERG Late Precipitation L3 1 day 0.1 degree x 0.1 degree',
'summary': 'Precipitation data corrected using Linear Scaling and Empirical Quantile Mapping',
'source': 'IMERG and CPC-UNI',
'history': f'Created on {pd.Timestamp.now()}',
'DOI': '10.5067/GPM/IMERGDF/DAY/07',
'creator_name': 'Benny Istanto',
'creator_role': 'Climate Geographer',
'creator_email': '[email protected]',
'comment': 'This dataset has been bias corrected using a temporal moving window approach'})
corrected_ds['precipitation'].attrs.update({
'units': 'mm',
'long_name': 'Corrected daily mean precipitation rate estimate',
'standard_name': 'corrected_precipitation'})
corrected_ds['lat'].attrs.update({
'units': 'degrees_north',
'long_name': 'Latitude'
})
corrected_ds['lon'].attrs.update({
'units': 'degrees_east',
'long_name': 'Longitude'
})
return corrected_ds
def main(start_year, end_year):
"""
The main process to calculate:
- bias correction
- save corrected data to netcdf
Returns:
- Output available in folder output/corrected_precip
"""
global user_choice
# Start timing the entire process
start_time = time.time()
# Load the land-sea mask from the external NetCDF file
mask_ds = xr.open_dataset(f'{mask_path}/idn_subset.nc')
# Create a boolean mask from the 'land' variable
land_sea_mask = mask_ds['land']
# Encoding for CF 1.8 (adjust based on your precision and range requirements)
cf18 = {'precipitation': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan}}
# or if using int16 with scaling
# cf18 = {'precipitation': {'dtype': 'int16', 'zlib': True, '_FillValue': -9999, 'scale_factor': 0.1}}
# Get the year range
for year in range(start_year, end_year + 1):
print(f"Processing year {year}...")
year_start_time = time.time()
try:
# Open the input data
imerg_ds = xr.open_dataset(f'{imerg_path}/idn_imergl_{year}.nc4', decode_times=True)
cpc_ds = xr.open_dataset(f'{cpc_path}/idn_cpc_{year}.nc4', decode_times=True)
except FileNotFoundError as e:
print(f"Error: {e}")
continue
# Get the month information
for month in range(1, 13):
_, days_in_month = calendar.monthrange(year, month)
# Generate the start dates of each dekad for the current month
dekad_dates = [pd.Timestamp(year, month, 1), pd.Timestamp(year, month, 11), pd.Timestamp(year, month, 21)]
days_in_dekads = [10, 10, days_in_month - 20]
if month == 2 and calendar.isleap(year):
days_in_dekads[2] = 9
# Loop through the dekads
for i, dekad_start in enumerate(dekad_dates):
dekad_str = dekad_start.strftime("%Y%m%d")
dekad_end = dekad_start + pd.Timedelta(days=days_in_dekads[i] - 1)
# Calculate the start and end dates for the moving window
window_start = dekad_start - pd.Timedelta(days=2)
window_end = dekad_start + pd.Timedelta(days=days_in_dekads[i] + 1)
# Slice the data for the moving window
imerg_window = imerg_ds.sel(time=slice(window_start, window_end))
cpc_window = cpc_ds.sel(time=slice(window_start, window_end))
if not imerg_window.time.size or not cpc_window.time.size:
print(f"Skipping dekad {dekad_str} due to empty time slice.")
continue
# Reindex CPC dataset to match IMERG dataset
cpc_window = cpc_window.reindex_like(imerg_window, method='nearest')
# Time the entire dekad processing
dekad_start_time = time.time()
# Perform the bias correction using the moving window
print("Computing bias correction...")
corrected_ds = lseqm_moving_window(imerg_window, cpc_window)
print("Bias correction computation completed...")
# Slice the corrected data for the current dekad
corrected_dekad = corrected_ds.sel(time=slice(dekad_start, dekad_end))
# Apply the land-sea mask to the corrected dataset before saving
land_sea_mask_interp = land_sea_mask.interp(lat=corrected_dekad.lat, lon=corrected_dekad.lon, method="nearest")
corrected_dekad_masked = corrected_dekad.where(land_sea_mask_interp == 1, drop=True)
# Save the corrected dekad data to a NetCDF file
print(f'Saving corrected precipitation for {dekad_str} to a netCDF...')
corrected_output_file = f'{corrected_precip_path}/idn_corrected_tw_imergl_{dekad_str}.nc'
# Check if the file already exists
if os.path.exists(corrected_output_file):
set_user_decision()
if user_choice == 'S':
print(f"Skipping file {corrected_output_file}")
continue
elif user_choice == 'A':
print("Aborting process.")
return
corrected_dekad_masked.to_netcdf(corrected_output_file, encoding = cf18, engine='netcdf4')
dekad_processing_time = time.time() - dekad_start_time
print(f"Processed and saved corrected precipitation for {dekad_str} in {dekad_processing_time:.2f} seconds.")
print(f"Year {year} processing completed in {time.time() - year_start_time:.2f} seconds.")
print(f"Total processing time: {time.time() - start_time:.2f} seconds.")
if __name__ == '__main__':
start_year = int(input("Enter the start year: "))
end_year = int(input("Enter the end year: "))
main(start_year, end_year)

To do list

  1. Find why temporal moving window takes around 10 minutes for each iteration and has an output around 800KB, while spatial moving window take 20 minuted for the whole 22 years data with 792 iteration, and has output with file size around 400KB
  2. Measuring performance for the whole coverage as one and grid-based performance
  3. Create visual output: taylor diagram, map
  4. Bootstrapping for Uncertainty Estimates: Bootstrapping is a statistical technique that involves repeatedly sampling from a dataset with replacement to estimate the distribution of a statistic. It can provide confidence intervals and other measures of uncertainty for your bias-corrected values.

Function for calculate_metric

def calculate_metrics(ref_ds, corrected_ds, imerg_ds, dekad_start, dekad_end):
    """
    Calculate the following metrics for the reference (CPC), original (IMERG), and corrected (IMERG) data:
    - relative bias
    - Pearson correlation coefficient
    - root mean squared error
    - mean absolute error
    - probability of detection
    - false alarm ratio
    - critical success index

    Parameters:
    - ref_ds (xarray.Dataset): the reference dataset (CPC).
    - corrected_ds (xarray.Dataset): the corrected dataset (IMERG).
    - imerg_ds (xarray.Dataset): the original dataset (IMERG).

    Returns:
    - metrics (pandas.DataFrame): a dataframe containing the metric values.
    """
    # Add the original dataset (IMERG) to the datasets dictionary
    datasets = {'ref': ref_ds['precip'], 'corrected': corrected_ds['precip'], 'original': imerg_ds['precipitationCal']}
    results = []

    for name, data in datasets.items():
        # Calculate the relative bias
        relative_bias = (data.sum(dim='time') / ref_ds['precip'].sum(dim='time')).mean(dim=('lat', 'lon'))

        # Find common time points between the two datasets
        common_time = data.time.to_index().intersection(ref_ds['precip'].time.to_index())
        data_matched = data.sel(time=common_time)
        ref_matched = ref_ds['precip'].sel(time=common_time)

        # Flatten the data
        data_flat = data_matched.values.flatten()
        ref_flat = ref_matched.values.flatten()

        # Remove NaN values from the data
        mask = ~np.isnan(data_flat) & ~np.isnan(ref_flat)
        data_flat = data_flat[mask]
        ref_flat = ref_flat[mask]

        # Calculate the Pearson correlation coefficient
        pearson = np.corrcoef(data_flat, ref_flat)[0, 1]

        # Calculate the root mean squared error
        rmse = (((data - ref_ds['precip'])**2).mean(dim='time')**0.5).mean(dim=('lat', 'lon'))

        # Calculate the mean absolute error
        mae = (np.abs(data - ref_ds['precip'])).mean(dim='time').mean(dim=('lat', 'lon'))

        # Calculate the probability of detection
        pod = ((data > 0) & (ref_ds['precip'] > 0)).sum(dim='time') / (ref_ds['precip'] > 0).sum(dim='time')
        pod_mean = pod.mean(dim=('lat', 'lon'))

        # Calculate the false alarm ratio
        far = ((data > 0) & (ref_ds['precip'] == 0)).sum(dim='time') / (data > 0).sum(dim='time')
        far_mean = far.mean(dim=('lat', 'lon'))

        # Calculate the critical success index
        csi = pod / (pod + far)
        csi_mean = csi.mean(dim=('lat', 'lon'))

        # Calculate the standard deviation for each dataset
        std_dev = data.std(dim='time').mean(dim=('lat', 'lon'))

        # Create a dataframe to store the metric values
        metrics = pd.DataFrame({'dataset': [name],
                                'relative_bias': [relative_bias.values.item()],
                                'pearson': [pearson],
                                'rmse': [rmse.values.item()],
                                'mae': [mae.values.item()],
                                'pod': [pod_mean.values.item()],
                                'far': [far_mean.values.item()],
                                'csi': [csi_mean.values.item()],
                                'std_dev': [std_dev.values.item()]})
        results.append(metrics)

    return pd.concat(results, ignore_index=True)

Function for calculate_grid_based_metric

def calculate_grid_based_metrics(ref_ds, corrected_ds, imerg_ds, dekad_start, dekad_end):
    """
    Calculate the following metrics for the reference (CPC), original (IMERG), and corrected (IMERG) data:
    - relative bias
    - Pearson correlation coefficient
    - root mean squared error
    - mean absolute error
    - probability of detection
    - false alarm ratio
    - critical success index

    Parameters:
    - ref_ds (xarray.Dataset): the reference dataset (CPC).
    - corrected_ds (xarray.Dataset): the corrected dataset (IMERG).
    - imerg_ds (xarray.Dataset): the original dataset (IMERG).
    - dekad_start (str): the start date of the dekad.
    - dekad_end (str): the end date of the dekad.
    Returns:
    - grid_metrics (xarray.Dataset): a dataset containing the metric values for each grid cell.
    """
    # Define a function that computes the metrics for a single time step
    def compute_metrics_for_time_step(ref_t, corrected_t):
        # Calculate the relative bias
        relative_bias = corrected_t - ref_t

        # Calculate the Pearson correlation coefficient
        pearson_r = xr.corr(corrected_t, ref_t)

        # Calculate the root mean squared error
        rmse = np.sqrt(((corrected_t - ref_t) ** 2))

        # Calculate the mean absolute error
        mae = np.abs(corrected_t - ref_t)

        # Calculate the probability of detection
        pod = ((corrected_t > 0) & (ref_t > 0)).astype(int) / ((ref_t > 0).astype(int))

        # Calculate the false alarm ratio
        far = ((corrected_t > 0) & (ref_t == 0)).astype(int) / ((corrected_t > 0).astype(int))

        # Calculate the critical success index
        csi = pod / (pod + far)

        return xr.Dataset(
            {
                'relative_bias': relative_bias,
                'pearson_r': pearson_r,
                'rmse': rmse,
                'mae': mae,
                'pod': pod,
                'far': far,
                'csi': csi,
            }
        )

    # Compute the metrics for each time step
    metrics_list = []
    days_in_dekad = len(pd.date_range(dekad_start, dekad_end))  # Calculate the number of days in the dekad
    for t in pd.date_range(dekad_start, dekad_end):
        ref_t = ref_ds['precip'].sel(time=t)
        corrected_t = corrected_ds['precip'].sel(time=t)

        metrics_t = compute_metrics_for_time_step(ref_t, corrected_t)
        metrics_list.append(metrics_t)

    # Concatenate the metrics along the time dimension
    grid_metrics = xr.concat(metrics_list, dim=pd.DatetimeIndex(pd.date_range(dekad_start, dekad_end), name='time'))

    return grid_metrics

Function for bootstrap

def bootstrap_bias_correction(imerg_values, cpc_values, n_iterations=1000):
    """
    Perform bootstrapping for uncertainty estimates in the bias correction process.

    Parameters:
    imerg_values (numpy.ndarray): Array of IMERG precipitation values.
    cpc_values (numpy.ndarray): Array of CPC precipitation values.
    n_iterations (int): Number of bootstrap iterations.

    Returns:
    tuple: Lower and upper bounds of the 95% confidence interval for the corrected values.
    """
    bootstrap_means = []
    for _ in range(n_iterations):
        imerg_resample = np.random.choice(imerg_values, size=len(imerg_values), replace=True)
        cpc_resample = np.random.choice(cpc_values, size=len(cpc_values), replace=True)
        corrected_values = gamma_quantile_mapping(imerg_resample, cpc_resample)
        bootstrap_means.append(np.mean(corrected_values))
    lower_ci = np.percentile(bootstrap_means, 2.5)
    upper_ci = np.percentile(bootstrap_means, 97.5)
    return lower_ci, upper_ci

Function for bootstrap and other metrics

import numpy as np
import xarray as xr
from sklearn.metrics import mean_absolute_error, r2_score, precision_score, recall_score

def bootstrap_bias_correction(imerg_values, cpc_values, n_iterations=1000):
    """
    Perform bootstrapping for uncertainty estimates in the bias correction process.

    Parameters:
    imerg_values (numpy.ndarray): Array of IMERG precipitation values.
    cpc_values (numpy.ndarray): Array of CPC precipitation values.
    n_iterations (int): Number of bootstrap iterations.

    Returns:
    tuple: Lower and upper bounds of the 95% confidence interval for the corrected values.
    """
    bootstrap_means = np.empty((n_iterations, imerg_values.shape[0], imerg_values.shape[1]))
    for i in range(n_iterations):
        resample_indices = np.random.choice(len(imerg_values.flatten()), size=len(imerg_values.flatten()), replace=True)
        imerg_resample = imerg_values.flatten()[resample_indices].reshape(imerg_values.shape)
        cpc_resample = cpc_values.flatten()[resample_indices].reshape(cpc_values.shape)
        corrected_values = gamma_quantile_mapping(imerg_resample, cpc_resample)
        bootstrap_means[i, :, :] = corrected_values

    lower_ci = np.percentile(bootstrap_means, 2.5, axis=0)
    upper_ci = np.percentile(bootstrap_means, 97.5, axis=0)
    return lower_ci, upper_ci

def calculate_metrics(imerg_values, cpc_values, corrected_values):
    """
    Calculate grid metrics for the bias-corrected data.

    Parameters:
    imerg_values (numpy.ndarray): Array of IMERG precipitation values.
    cpc_values (numpy.ndarray): Array of CPC precipitation values.
    corrected_values (numpy.ndarray): Array of bias-corrected precipitation values.

    Returns:
    dict: Dictionary containing calculated metrics.
    """
    mae = mean_absolute_error(cpc_values.flatten(), corrected_values.flatten())
    r2 = r2_score(cpc_values.flatten(), corrected_values.flatten())
    precision = precision_score(cpc_values.flatten(), corrected_values.flatten(), average='macro')
    recall = recall_score(cpc_values.flatten(), corrected_values.flatten(), average='macro')
    
    metrics = {
        'MAE': mae,
        'R2': r2,
        'Precision': precision,
        'Recall': recall
    }
    return metrics

def process_bootstrap_and_metrics(corrected_file, cpc_file, output_file):
    """
    Process bootstrapping for uncertainty estimates and calculate grid metrics.

    Parameters:
    corrected_file (str): Path to the corrected precipitation file.
    cpc_file (str): Path to the CPC precipitation file.
    output_file (str): Path to the output file for saving bootstrapping results and metrics.
    """
    corrected_ds = xr.open_dataset(corrected_file)
    cpc_ds = xr.open_dataset(cpc_file)

    imerg_values = corrected_ds['precip'].values
    cpc_values = cpc_ds['precip'].values

    lower_ci, upper_ci = bootstrap_bias_correction(imerg_values, cpc_values)

    metrics = calculate_metrics(imerg_values, cpc_values, corrected_ds['precip'].values)

    # Save lower and upper CI to a new NetCDF file
    ds_ci = xr.Dataset(
        {
            'lower_ci': (('lat', 'lon'), lower_ci),
            'upper_ci': (('lat', 'lon'), upper_ci),
            'MAE': ((), metrics['MAE']),
            'R2': ((), metrics['R2']),
            'Precision': ((), metrics['Precision']),
            'Recall': ((), metrics['Recall'])
        },
        coords={'lat': corrected_ds['lat'], 'lon': corrected_ds['lon']}
    )
    ds_ci.to_netcdf(output_file, encoding={'lower_ci': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan},
                                           'upper_ci': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan},
                                           'MAE': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan},
                                           'R2': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan},
                                           'Precision': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan},
                                           'Recall': {'dtype': 'float32', 'zlib': True, '_FillValue': np.nan}})
    print(f'Saved uncertainty estimates and metrics to {output_file}')

# Example usage
dekad_start = pd.Timestamp(2020, 1, 1)
corrected_file = f'/path/to/corrected_precip_lseqm_{dekad_start.strftime("%Y%m%d")}.nc'
cpc_file = f'/path/to/idn_cpc_{dekad_start.year}.nc4'
output_file = f'/path/to/uncertainty_metrics_{dekad_start.strftime("%Y%m%d")}.nc'

process_bootstrap_and_metrics(corrected_file, cpc_file, output_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment