Last active
January 21, 2023 13:15
-
-
Save bennyistanto/4e47c19d8f8aa1888cc98c36d827e726 to your computer and use it in GitHub Desktop.
Bias correction using various methods, the input come from IMERG and CPC Gauge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
NAME | |
imerg_cpc_biascorrection.py | |
DESCRIPTION | |
Bias correction using various methods: | |
i. Scale, | |
ii. Distribution, | |
iii. Delta, | |
iv. the Least Squares Composite differencing (LSC) | |
v. the Linear Scaling (LS) and quantile-mapping CDF matching approaches (LSCDF), | |
vi. Replacement-based Cumulative Distribution Function (RCDF) Mapping, | |
vii. Multi Linear ERegression, | |
viii. Artificial Neural Network, | |
ix. Kalman filtering, | |
x. Bias Correction and Spatial Disaggregation (BCSD), | |
xi. Bias Correction and Spatially Disaggregated Mapping (BCSDM), | |
xii. Quantile-quantile mapping | |
xiii. Empirical Quantile Mapping (eQM), xiv. Adjusted Quantile Mapping (aQM) | |
xiv. Gamma Distribution Quantile Mapping (gQM), | |
xv. Gamma and Generalized Pareto Distribution Quantile Mapping (gpdQM) | |
REQUIREMENT | |
It required os, calendar, numpy, xarray, pandas, scipy, and dask module. | |
So it will work on any machine environment. | |
And please do check specific module in every function. | |
HOW-TO USE | |
python imerg_cpc_biascorrection.py | |
NOTES | |
Input data for this script will use GPM IMERG daily data compiled as 1 year 1 nc file | |
and CPC Global Unified Gauge-Based Analysis of Daily Precipitation (GUGBADP). | |
This script is designed to work with global daily IMERG and GUGBADP data, and compiled as | |
1 year 1 nc file folowing GUGBDAP structure. To do this, you can use Climate Data Operator | |
(CDO) to manipulate the IMERG. Some CDO's module like mergetime and remapbil are useful to | |
merge daily data into annual data then re-grided following GUGBDAP spatial resolution. | |
After this steps are done, you can start the correction. | |
Both variables in IMERG and GUGBDAP is written as "precipitationCal" and "precip"", | |
some adjustment are required: parsing filename, directory, variable name, etc. | |
WORKING DIRECTORY | |
/input/imerg - put your IMERG data here | |
/input/cpc - put your GUGBADP data here | |
/output/{method}/corrected - location for corrected precipitation output | |
/output/{method}/factors - location for corrected multiplying factors output | |
/output/{method}/metrics - location for corrected statistical metrics output | |
DATA | |
IMERG: | |
- Early: https://gpm1.gesdisc.eosdis.nasa.gov/data/GPM_L3/GPM_3IMERGDE.06/ | |
- Late: https://gpm1.gesdisc.eosdis.nasa.gov/data/GPM_L3/GPM_3IMERGDL.06/ | |
- Final: https://gpm1.gesdisc.eosdis.nasa.gov/data/GPM_L3/GPM_3IMERGDF.06/ | |
GUGBADP: https://psl.noaa.gov/data/gridded/data.cpc.globalprecip.html | |
CONTACT | |
Benny Istanto | |
Climate Geographer | |
GOST/DECAT/DEC Data Group, The World Bank | |
LICENSE | |
This script is in the public domain, free from copyrights or restrictions. | |
VERSION | |
$Id$ | |
TODO | |
xx | |
""" | |
import os | |
import calendar | |
import xarray as xr | |
import numpy as np | |
import pandas as pd | |
def bias_correction(imerg_ds, cpc_ds, method='rcdfm'): | |
""" | |
Correct the bias in the IMERG data using the Replacement-based CDF Mapping method or other methods. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
- method (str): the method to use for correction, either 'rcdfm', 'scale', 'distribution' or other. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
if method == 'scale': | |
corrected_ds = scale(imerg_ds, cpc_ds, method='mean') | |
elif method == 'distribution': | |
corrected_ds = distribution(imerg_ds, cpc_ds) | |
elif method == 'delta': | |
corrected_ds = delta(imerg_ds, cpc_ds) | |
elif method == 'lsc': | |
corrected_ds = lsc(imerg_ds, cpc_ds) | |
elif method == 'lscdf': | |
corrected_ds = lscdf(imerg_ds, cpc_ds) | |
elif method == 'rcdfm': | |
corrected_ds = rcdfm(imerg_ds, cpc_ds) | |
elif method == 'mlr': | |
corrected_ds = mlr(imerg_ds, cpc_ds) | |
elif method == 'ann': | |
corrected_ds = ann(imerg_ds, cpc_ds) | |
elif method == 'kalman': | |
corrected_ds = kalman(imerg_ds, cpc_ds) | |
elif method == 'bcsd': | |
corrected_ds = bcsd(imerg_ds, cpc_ds, method='linear') | |
elif method == 'bcsdm': | |
corrected_ds = bcsdm(imerg_ds, cpc_ds, method='linear') | |
elif method == 'qq_mapping': | |
corrected_ds = qq_mapping(imerg_ds, cpc_ds) | |
elif method == 'eQM': | |
corrected_ds = eQM(cpc_ds, imerg_ds) | |
elif method == 'aQM': | |
corrected_ds = aQM(cpc_ds, imerg_ds, alpha=0.5) | |
elif method == 'gQM': | |
corrected_ds = gQM(cpc_ds, imerg_ds) | |
elif method == 'gpdQM': | |
corrected_ds = gpdQM(cpc_ds, imerg_ds) | |
# add more elif statement for other correction methods | |
else: | |
raise ValueError("Invalid method. Choose either 'scale', 'distribution', 'delta', 'lsc', 'lscdf', \ | |
'rcdfm', 'mlr', 'ann', 'kalman', 'bcsd', 'bcsdm', 'qq_mapping', 'eQM', 'aQM', 'gQM', 'gpdQM'.") | |
return corrected_ds | |
def scale(imerg_ds, cpc_ds, method='mean'): | |
""" | |
Scale the IMERG data by a constant factor, determined by the ratio of the mean or median of the IMERG data | |
to the mean or median of the CPC data. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
- method (str): the method to use to calculate the scaling factor, either 'mean' or 'median' | |
Returns: | |
- corrected_ds (xarray.Dataset): the scaled IMERG data. | |
""" | |
def custom_scaling_factor(imerg_precip, cpc_precip, method='mean'): | |
""" | |
Calculate the scaling factor for correcting the IMERG data using either the mean or median of the IMERG and CPC data. | |
Parameters: | |
- imerg_precip (xarray.DataArray): the IMERG precipitation data. | |
- cpc_precip (xarray.DataArray): the CPC precipitation data. | |
- method (str): the method to use to calculate the scaling factor, either 'mean' or 'median' | |
Returns: | |
- scaling_factor (xarray.DataArray): the scaling factor for correcting the IMERG data. | |
""" | |
if method == 'mean': | |
imerg_mean = imerg_precip.mean() | |
cpc_mean = cpc_precip.mean() | |
elif method == 'median': | |
imerg_mean = xr.apply_ufunc(np.median, imerg_precip) | |
cpc_mean = xr.apply_ufunc(np.median, cpc_precip) | |
else: | |
raise ValueError("Invalid method. Choose either 'mean' or 'median'.") | |
scaling_factor = cpc_mean / imerg_mean | |
return scaling_factor | |
scaling_factor = custom_scaling_factor(imerg_ds['precipitationCal'], cpc_ds['precip'], method) | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), | |
xr.apply_ufunc(lambda x: x * scaling_factor, imerg_ds['precipitationCal'], | |
dask='parallelized', output_dtypes=[float]))}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def distribution(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using the distribution-based method, specifically the probability density function (PDF) matching method | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
from scipy import stats | |
def pdf_matching(imerg_precip, cpc_precip): | |
# Compute the probability density functions for the IMERG and CPC data | |
imerg_pdf, _ = stats.gaussian_kde(imerg_precip) | |
cpc_pdf, _ = stats.gaussian_kde(cpc_precip) | |
# Map the IMERG data to the CPC PDF using linear interpolation | |
corrected_precip = np.interp(imerg_precip, imerg_pdf, cpc_pdf) | |
return corrected_precip | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'] | |
cpc_precip = cpc_ds['precip'] | |
# Perform the correction | |
corrected_precip = xr.apply_ufunc(pdf_matching, imerg_precip, cpc_precip, | |
input_core_dims=[['time'], ['time']], | |
output_core_dims=[['time']], | |
dask='parallelized', output_dtypes=[float]) | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def delta(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using the delta method. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'] | |
cpc_precip = cpc_ds['precip'] | |
# Compute the bias-corrected data using the delta method | |
corrected_precip = imerg_precip + (cpc_precip - imerg_precip) | |
# Define the bias correction function | |
def delta_precip(imerg, cpc): | |
return imerg + (cpc - imerg) | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(delta_precip, imerg_precip, cpc_precip, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def lsc(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using the Least Squares Composite differencing (LSC) method. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
from scipy.optimize import minimize | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'] | |
cpc_precip = cpc_ds['precip'] | |
# Define the objective function that will be minimized | |
def objective(c, imerg_precip, cpc_precip): | |
""" | |
The objective function to be minimized in the LSCD method. | |
Parameters: | |
- c (float): the correction factor. | |
- imerg_precip (xarray.DataArray): the IMERG precipitation data. | |
- cpc_precip (xarray.DataArray): the CPC precipitation data. | |
Returns: | |
- objective_val (float): the value of the objective function for the given correction factor. | |
""" | |
objective_val = np.sum((imerg_precip + c - cpc_precip)**2) | |
return objective_val | |
# Define the function to compute the LSCD correction | |
def compute_lscd(imerg_precip, cpc_precip): | |
""" | |
Compute the correction factor for the LSCD method. | |
Parameters: | |
- imerg_precip (xarray.DataArray): the IMERG precipitation data. | |
- cpc_precip (xarray.DataArray): the CPC precipitation data. | |
Returns: | |
- correction (float): the correction factor. | |
""" | |
# Initialize the correction factor | |
c0 = 0.0 | |
# Perform the optimization | |
res = minimize(objective, c0, args=(imerg_precip, cpc_precip)) | |
correction = res.x[0] | |
return correction | |
# Perform the correction | |
correction = xr.apply_ufunc(compute_lscd, imerg_precip, cpc_precip, | |
input_core_dims=[['time'], ['time']], | |
output_core_dims=[[]], | |
dask='parallelized', output_dtypes=[float]) | |
# Apply the correction to the IMERG data | |
corrected_precip = imerg_precip + correction | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def lscdf(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using the Linear Scaling (LS) and quantile-mapping Cumulative Distribution Function (CDF) matching approaches (LSCDF). | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
from scipy.stats import rankdata, linregress | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'] | |
cpc_precip = cpc_ds['precip'] | |
# Compute the bias-corrected data using the LSCDF method | |
corrected_precip = xr.apply_ufunc(lambda x, y: x + (linregress(rankdata(y), rankdata(x))[0] * (rankdata(x) - 0.5)), | |
imerg_precip, cpc_precip, dask='parallelized', output_dtypes=[imerg_precip.dtype]) | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def rcdfm(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using the Replacement-based CDF Mapping method. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
from scipy.stats import rankdata | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'] | |
cpc_precip = cpc_ds['precip'] | |
# Get the empirical CDF of the CPC data | |
cpc_cdf = xr.apply_ufunc(lambda x: rankdata(x)/x.size, cpc_precip, dask='parallelized') | |
# Perform the correction | |
corrected_precip = xr.where(imerg_precip!=0, imerg_precip.interp(precipitation=cpc_cdf), 0) | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def mlr(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using multiple linear regression. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
This function uses the LinearRegression from scikit-learn library, it flattens | |
the data to 2D arrays, then gets the lat and lon as additional predictors and | |
stack it with imerg_precip, then trains the multiple linear regression model using | |
the stacked predictors as input and cpc_precip as the target, and then applies | |
the trained model to predict the corrected precipitation. The corrected precipitation | |
is then reshaped back to the original shape before returning the corrected dataset. | |
""" | |
from sklearn.linear_model import LinearRegression | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'].values | |
cpc_precip = cpc_ds['precip'].values | |
# Flatten the data to 2D array | |
imerg_precip = imerg_precip.reshape(-1,1) | |
cpc_precip = cpc_precip.reshape(-1,1) | |
# Get the lats and lons as additional predictors | |
lats = imerg_ds['lat'].values | |
lons = imerg_ds['lon'].values | |
# Stack the predictors | |
predictors = np.column_stack((imerg_precip, lats, lons)) | |
# Train the multiple linear regression model | |
mlr = LinearRegression() | |
mlr.fit(predictors, cpc_precip) | |
# Use the trained model to predict the corrected precipitation | |
corrected_precip = mlr.predict(predictors) | |
# Define the bias correction function | |
def mlr_precip(imerg, cpc, lats, lons): | |
predictors = np.column_stack((imerg, lats, lons)) | |
corrected_precip = mlr.predict(predictors) | |
return corrected_precip | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(mlr_precip, imerg_precip, cpc_precip, lats, lons, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def ann(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using an artificial neural network. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
This function uses the MLPRegressor from scikit-learn library as an example of ANN, | |
it flattens the data to 2D arrays, trains the ANN model using imerg_precip as input | |
and cpc_precip as the target, then applies the trained model to predict the corrected | |
precipitation. The corrected precipitation is then reshaped back to the original shape | |
before returning the corrected dataset. | |
""" | |
from sklearn.neural_network import MLPRegressor | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'].values | |
cpc_precip = cpc_ds['precip'].values | |
# Flatten the data to 2D array | |
imerg_precip = imerg_precip.reshape(-1,1) | |
cpc_precip = cpc_precip.reshape(-1,1) | |
# Train the ANN model | |
ann = MLPRegressor(hidden_layer_sizes=(50,50), max_iter=1000) | |
ann.fit(imerg_precip, cpc_precip) | |
# Use the trained ANN model to predict the corrected precipitation | |
corrected_precip = ann.predict(imerg_precip) | |
# Define the bias correction function | |
def ann_precip(imerg): | |
return ann.predict(imerg) | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(ann_precip, imerg_precip, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def kalman(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using Kalman filtering. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
This function uses the KalmanFilter from pykalman library, it flattens the data to | |
2D arrays, uses the Expectation-Maximization (EM) algorithm to estimate the parameters | |
of the Kalman filter and use it to predict the corrected precipitation, then applies | |
the trained model to predict the corrected precipitation. The corrected precipitation | |
is then reshaped back to the original shape before returning the corrected dataset. | |
""" | |
from pykalman import KalmanFilter | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'].values | |
cpc_precip = cpc_ds['precip'].values | |
# Flatten the data to 2D array | |
imerg_precip = imerg_precip.reshape(-1,1) | |
cpc_precip = cpc_precip.reshape(-1,1) | |
# Define the Kalman filter | |
kf = KalmanFilter(initial_state_mean=imerg_precip[0], n_dim_obs=1) | |
kf = kf.em(cpc_precip, n_iter=5) | |
# Use the Kalman filter to predict the corrected precipitation | |
corrected_precip, _ = kf.filter(cpc_precip) | |
# Define the bias correction function | |
def kalman_precip(imerg, cpc): | |
kf = KalmanFilter(initial_state_mean=imerg[0], n_dim_obs=1) | |
kf = kf.em(cpc, n_iter=5) | |
corrected_precip, _ = kf.filter(cpc) | |
return corrected_precip | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(kalman_precip, imerg_precip, cpc_precip, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def bcsd(imerg_ds, cpc_ds, method='linear'): | |
""" | |
Correct the bias in the IMERG data using Bias Correction and Spatial Disaggregation (BCSD) method. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
- method (str): method of interpolation, by default is linear | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
This function uses the griddata from scipy.interpolate library, it first gets the lat | |
and lon coordinates of the CPC data, then it uses the griddata function to interpolate | |
the CPC data to the grid of the IMERG data. Interpolation method is set to linear by | |
default, but you can set it to another method if you want. | |
""" | |
from scipy.interpolate import griddata | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'].values | |
cpc_precip = cpc_ds['precip'].values | |
# Get the coordinates of the CPC data | |
lats = cpc_ds['lat'].values | |
lons = cpc_ds['lon'].values | |
coords = np.array(list(zip(lats, lons))) | |
# Interpolate the CPC data to the grid of the IMERG data | |
corrected_precip = griddata(coords, cpc_precip, (imerg_ds.lat, imerg_ds.lon), method=method) | |
# Define the bias correction function | |
def bcsd_precip(imerg, cpc, lats, lons): | |
coords = np.array(list(zip(lats, lons))) | |
corrected_precip = griddata(coords, cpc, (imerg.lat, imerg.lon), method=method) | |
return corrected_precip | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(bcsd_precip, imerg_precip, cpc_precip, lats, lons, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def bcsdm(imerg_ds, cpc_ds, method='linear'): | |
""" | |
Correct the bias in the IMERG data using Bias Correction and Spatially Disaggregated Mapping (BCSDM) method. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
- method (str): method of interpolation, by default is linear | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
""" | |
from scipy.interpolate import griddata | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'].values | |
cpc_precip = cpc_ds['precip'].values | |
# Flatten the data to 2D array | |
imerg_precip = imerg_precip.reshape(-1,1) | |
cpc_precip = cpc_precip.reshape(-1,1) | |
# Get the coordinates of the CPC data | |
lats = cpc_ds['lat'].values | |
lons = cpc_ds['lon'].values | |
coords = np.array(list(zip(lats, lons))) | |
# Interpolate the CPC data to the grid of the IMERG data | |
corrected_precip = griddata(coords, cpc_precip, (imerg_ds.lat, imerg_ds.lon), method=method) | |
# Define the bias correction function | |
def bcsdm_precip(imerg, cpc, lats, lons): | |
coords = np.array(list(zip(lats, lons))) | |
corrected_precip = griddata(coords, cpc, (imerg.lat, imerg.lon), method=method) | |
return corrected_precip | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(bcsdm_precip, imerg_precip, cpc_precip, lats, lons, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def qq_mapping(imerg_ds, cpc_ds): | |
""" | |
Correct the bias in the IMERG data using quantile-quantile mapping (QQ-mapping) method. | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
This function uses the rankdata from scipy.stats library, it flattens the data to | |
1D arrays, then computes the cumulative distribution function (CDF) of the IMERG data | |
and CPC data. Then it uses the np.interp to interpolate the CPC CDF to the IMERG CDF, | |
and this will give the correct precipitation values. The corrected precipitation is | |
then reshaped back to the original shape before returning the corrected dataset. | |
""" | |
from scipy.stats import rankdata | |
# Get the precipitation data from the input datasets | |
imerg_precip = imerg_ds['precipitationCal'].values | |
cpc_precip = cpc_ds['precip'].values | |
# Flatten the data to 1D array | |
imerg_precip = imerg_precip.flatten() | |
cpc_precip = cpc_precip.flatten() | |
# Compute the cumulative distribution function (CDF) of the IMERG data | |
imerg_cdf = rankdata(imerg_precip) / len(imerg_precip) | |
# Compute the CDF of the CPC data | |
cpc_cdf = rankdata(cpc_precip) / len(cpc_precip) | |
# Interpolate the CPC CDF to the IMERG CDF | |
corrected_precip = np.interp(imerg_cdf, cpc_cdf, cpc_precip) | |
# Define the bias correction function | |
def qq_mapping_precip(imerg, cpc): | |
imerg_cdf = rankdata(imerg) / len(imerg) | |
cpc_cdf = rankdata(cpc) / len(cpc) | |
corrected_precip = np.interp(imerg_cdf, cpc_cdf, cpc) | |
return corrected_precip | |
# Apply the bias correction function using xarray's apply_ufunc | |
corrected_precip = xr.apply_ufunc(qq_mapping_precip, imerg_precip, cpc_precip, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def eQM_(cpc_ds, imerg_ds): | |
""" | |
Correct the bias in the simulated data using Empirical Quantile Mapping (eQM) method. | |
Parameters: | |
- cpc_ds (xarray.Dataset): the observed data to use as a reference for correction. | |
- imerg_ds (xarray.Dataset): the simulated data to be corrected. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected simulated data. | |
This function uses the scipy.stats.percentileofscore() function to compute the percentiles | |
of the observed and simulated datasets and use numpy.interp() to interpolate the percentiles | |
of the observed data to the percentiles of the simulated data, this will give the correct | |
precipitation values. The corrected precipitation is then reshaped back to the original | |
shape before returning the corrected dataset. | |
""" | |
from scipy.stats import percentileofscore | |
# Get the precipitation data from the input datasets | |
cpc_precip = cpc_ds['precip'].values | |
imerg_precip = imerg_ds['precipitationCal'].values | |
# Compute the percentiles of the observed and simulated data | |
cpc_percentiles = np.array([percentileofscore(cpc_precip, val, kind='rank') for val in cpc_precip]) | |
imerg_percentiles = np.array([percentileofscore(imerg_precip, val, kind='rank') for val in imerg_precip]) | |
# Define the bias correction function | |
def eQM_precip(cpc, imerg): | |
cpc_percentiles = np.array([percentileofscore(cpc_precip, val, kind='rank') for val in cpc_precip]) | |
imerg_percentiles = np.array([percentileofscore(imerg_precip, val, kind='rank') for val in imerg_precip]) | |
corrected = np.interp(imerg_percentiles, cpc_percentiles, cpc_precip) | |
return corrected | |
corrected_precip = xr.apply_ufunc(eQM_precip, cpc_precip, imerg_precip, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def aQM(cpc_ds, imerg_ds, alpha=0.5): | |
""" | |
Correct the bias in the IMERG data using Adjusted Quantile Mapping (AQM) method. | |
Parameters: | |
- cpc_ds (xarray.Dataset): the CPC data to use as a reference for correction. | |
- imerg_ds (xarray.Dataset): the IMERG data to be corrected. | |
- alpha (float): the adjustment parameter, by default is 0.5 | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
In this example, the cpc_ds is passed as the first argument and is used as the reference | |
dataset. The imerg_ds is passed as the second argument and is used as the dataset to be | |
corrected. The adjustment parameter alpha is still optional and the default value is 0.5, | |
but the user can change it to another value. The corrected precipitation is then reshaped | |
back to the original shape before returning the corrected dataset. | |
""" | |
from scipy.stats import percentileofscore | |
# Get the precipitation data from the input datasets | |
cpc_precip = cpc_ds['precip'].values | |
imerg_precip = imerg_ds['precipitationCal'].values | |
# Compute the percentiles of the observed and simulated data | |
cpc_percentiles = np.array([percentileofscore(cpc_precip, val, kind='rank') for val in cpc_precip]) | |
imerg_percentiles = np.array([percentileofscore(imerg_precip, val, kind='rank') for val in imerg_precip]) | |
# Define the bias correction function | |
def aQM_precip(cpc_precip, imerg_precip, alpha): | |
cpc_percentiles = np.array([percentileofscore(cpc_precip, val, kind='rank') for val in cpc_precip]) | |
imerg_percentiles = np.array([percentileofscore(imerg_precip, val, kind='rank') for val in imerg_precip]) | |
corrected = np.interp(imerg_percentiles, (1-alpha)*cpc_percentiles + alpha*50, cpc_precip) | |
return corrected | |
corrected_precip = xr.apply_ufunc(aQM_precip, cpc_precip, imerg_precip, alpha, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def gQM(cpc_ds, imerg_ds): | |
""" | |
Correct the bias in the simulated data using Gamma Distribution Quantile Mapping (gQM) method. | |
Parameters: | |
- cpc_ds (xarray.Dataset): the observed data to use as a reference for correction. | |
- imerg_ds (xarray.Dataset): the simulated data to be corrected. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected simulated data. | |
In this example, the obs_ds is passed as the first argument and is used as the reference dataset. | |
The sim_ds is passed as the second argument and is used as the dataset to be corrected. T | |
he scipy.stats.gamma.fit() function is used to fit the gamma distribution to the observed data, | |
and the scipy.stats.gamma.ppf() function is used to perform the bias correction by transforming | |
the percentiles of the simulated data to the percentiles of the observed data. The corrected | |
precipitation is then reshaped back to the original shape before returning the corrected dataset. | |
""" | |
import scipy.stats as stats | |
# Get the precipitation data from the input datasets | |
cpc_precip = cpc_ds['precip'].values | |
imerg_precip = imerg_ds['precipitationCal'].values | |
# Fit a gamma distribution to the observed data | |
cpc_shape, cpc_loc, cpc_scale = stats.gamma.fit(cpc_precip) | |
# Define the bias correction function | |
def gQM_precip(imerg, cpc_shape, cpc_loc, cpc_scale): | |
corrected = stats.gamma.ppf(stats.rankdata(imerg_precip)/len(imerg_precip), cpc_shape, loc=cpc_loc, scale=cpc_scale) | |
return corrected | |
corrected_precip = xr.apply_ufunc(gQM_precip, imerg_precip, cpc_shape, cpc_loc, cpc_scale, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def gpdQM(cpc_ds, imerg_ds): | |
""" | |
Correct the bias in the simulated data using a combination of Gamma and Generalized Pareto Distribution Quantile Mapping (gpdQM) method. | |
Parameters: | |
- cpc_ds (xarray.Dataset): the observed data to use as a reference for correction. | |
- imerg_ds (xarray.Dataset): the simulated data to be corrected. | |
Returns: | |
- corrected_ds (xarray.Dataset): the bias-corrected simulated data. | |
the gpdQM_bias_correction() function first fits a gamma distribution to the observed data using scipy.stats.gamma.fit(), ] | |
and a Generalized Pareto Distribution (GPD) to the tail of the observed data using scipy.stats.genpareto.fit(). | |
Then, it applies the bias correction using the percentile-percentile method, where percentiles of simulated | |
precipitation are transformed to percentiles of observed precipitation using the scipy.stats.genpareto.ppf() | |
and scipy.stats.gamma.ppf() functions. | |
""" | |
import scipy.stats as stats | |
# Get the precipitation data from the input datasets | |
cpc_precip = cpc_ds['precip'].values | |
imerg_precip = imerg_ds['precipitationCal'].values | |
# Fit a gamma distribution to the observed data | |
cpc_shape, cpc_loc, cpc_scale = stats.gamma.fit(cpc_precip) | |
# Fit a Generalized Pareto Distribution to the tail of the observed data | |
cpc_gpd_shape, cpc_gpd_loc, cpc_gpd_scale = stats.genpareto.fit(cpc_precip, floc=cpc_loc) | |
# Define the bias correction function | |
def gpdQM_precip(imerg_precip, cpc_shape, cpc_loc, cpc_scale, cpc_gpd_shape, cpc_gpd_loc, cpc_gpd_scale): | |
# Use the gamma distribution for the majority of the data and the GPD for the tail | |
threshold = stats.gamma.ppf(1-1e-3, cpc_shape, loc=cpc_loc, scale=cpc_scale) | |
imerg_tail = imerg_precip[imerg_precip > threshold] | |
imerg_body = imerg_precip[imerg_precip <= threshold] | |
corrected_tail = stats.genpareto.ppf(stats.rankdata(imerg_tail)/len(imerg_tail), cpc_gpd_shape, loc=cpc_gpd_loc, scale=cpc_gpd_scale) | |
corrected_body = stats.gamma.ppf(stats.rankdata(imerg_body)/len(imerg_body), cpc_shape, loc=cpc_loc, scale=cpc_scale) | |
corrected = np.concatenate((corrected_body, corrected_tail)) | |
return corrected | |
corrected_precip = xr.apply_ufunc(gpdQM_precip, imerg_precip, cpc_shape, cpc_loc, cpc_scale, cpc_gpd_shape, cpc_gpd_loc, cpc_gpd_scale, dask='parallelized') | |
# Create a new xarray.Dataset with the bias-corrected data | |
corrected_ds = xr.Dataset(data_vars={'precipitation': (('time', 'lat', 'lon'), corrected_precip)}, | |
coords={'time': imerg_ds['time'], | |
'lat': imerg_ds['lat'], | |
'lon': imerg_ds['lon']}) | |
return corrected_ds | |
def create_multiplying_factors(corrected_ds, num_dekads=36): | |
""" | |
Create multiplying factors for correcting the IMERG data in the future. | |
Parameters: | |
- corrected_ds (xarray.Dataset): the bias-corrected IMERG data. | |
- num_dekads (int): the number of dekads (10-day periods) to create multiplying factors for. | |
Returns: | |
- dekad_factors (list of xarray.DataArray): the multiplying factors for each dekad. | |
""" | |
# Compute the number of days in each dekad | |
days_per_dekad = corrected_ds.time.size // num_dekads | |
# Initialize the list to store the multiplying factors | |
dekad_factors = [] | |
# Loop through each dekad | |
for i in range(num_dekads): | |
# Get the start and end indices for this dekad | |
start_idx = i * days_per_dekad | |
end_idx = start_idx + days_per_dekad - 1 | |
# Get the data for this dekad | |
dekad_data = corrected_ds['precipitation'].isel(time=slice(start_idx, end_idx+1)) | |
# Compute the mean of the data for this dekad | |
dekad_mean = dekad_data.mean(dim='time') | |
# Get the number of days in dekad | |
start_date = corrected_ds['time'][start_idx].values | |
end_date = corrected_ds['time'][end_idx].values + np.timedelta64(1,'D') | |
num_days = (end_date-start_date).astype('timedelta64[D]') / np.timedelta64(1,'D') | |
# Divide the mean by the number of days in this dekad, accounting for leap years | |
dekad_factor = dekad_mean / num_days | |
# Add the multiplying factor for this dekad to the list | |
dekad_factors.append(dekad_factor) | |
return dekad_factors | |
def calculate_metrics(imerg_ds, cpc_ds): | |
""" | |
Calculate the following metrics for the IMERG and CPC data: | |
- relative bias | |
- Pearson correlation coefficient | |
- root mean squared error | |
- mean absolute error | |
- probability of detection | |
- false alarm ratio | |
- critical success index | |
Parameters: | |
- imerg_ds (xarray.Dataset): the IMERG data. | |
- cpc_ds (xarray.Dataset): the CPC data. | |
Returns: | |
- metrics (pandas.DataFrame): a dataframe containing the metric values. | |
""" | |
# Get the precipitation data from the input datasets | |
imerg_corrected = imerg_ds['precipitation'] | |
cpc_precip = cpc_ds['precip'] | |
# Calculate the relative bias | |
relative_bias = (imerg_corrected.sum(dim='time') / cpc_precip.sum(dim='time')).mean(dim=('lat', 'lon')) | |
# Calculate the Pearson correlation coefficient | |
pearson = imerg_corrected.corr(cpc_precip, dim='time') | |
# Calculate the root mean squared error | |
rmse = (((imerg_corrected - cpc_precip)**2).mean(dim='time')**0.5).mean(dim=('lat', 'lon')) | |
# Calculate the mean absolute error | |
mae = (np.abs(imerg_corrected - cpc_precip)).mean(dim='time').mean(dim=('lat', 'lon')) | |
# Calculate the probability of detection | |
pod = (imerg_corrected > 0).sum(dim='time') / (cpc_precip > 0).sum(dim='time') | |
# Calculate the false alarm ratio | |
far = ((imerg_corrected > 0) & (cpc_precip == 0)).sum(dim='time') / (cpc_precip == 0).sum(dim='time') | |
# Calculate the critical success index | |
csi = pod / (pod + far) | |
# Create a dataframe to store the metric values | |
metrics = pd.DataFrame({'relative_bias': relative_bias, | |
'pearson': pearson, | |
'rmse': rmse, | |
'mae': mae, | |
'pod': pod, | |
'far': far, | |
'csi': csi}) | |
return metrics | |
def main(run_all_methods=False, method=None): | |
""" | |
The main process to calculate: | |
- bias correction | |
- save corrected data to netcdf | |
- metrics | |
- save metrics to csv | |
- multiplying factors | |
- save multiplying factors as netcdf | |
Returns: | |
- Output available in folder output/{method}/ corrected, metrics and factors | |
""" | |
from dask import delayed | |
# Get the correction method used | |
if not run_all_methods: | |
if method is None: | |
method = bias_correction.__defaults__[0] | |
else: | |
method = None | |
# Define the appropriate input and output directory paths | |
input_dir = f'input' | |
imerg_path = f'{input_dir}/imerg' | |
cpc_path = f'{input_dir}/cpc' | |
output_dir = f'output' | |
# method_dir = f'output/{method}' | |
# corrected_path = f'{method_dir}/corrected' | |
# factors_path = f'{method_dir}/factors' | |
# metrics_path = f'{method_dir}/metrics' | |
# Create the output directories if they don't already exist | |
os.makedirs(output_dir, exist_ok=True) | |
# os.makedirs(method_dir, exist_ok=True) | |
# os.makedirs(corrected_path, exist_ok=True) | |
# os.makedirs(factors_path, exist_ok=True) | |
# os.makedirs(metrics_path, exist_ok=True) | |
# Load the IMERG data | |
imerg_ds = xr.open_mfdataset('{imerg_path}/*.nc') | |
# Load the CPC data | |
cpc_ds = xr.open_mfdataset('{cpc_path}/*.nc') | |
# Initialize an empty dataframe to store the metric values | |
metrics = pd.DataFrame(columns=['relative_bias', 'pearson', 'rmse', 'mae', 'pod', 'far', 'csi']) | |
# Use dask's delayed function to schedule the computation for each year in parallel | |
corrected_ds_list = [] | |
if run_all_methods: | |
methods = ['scale', 'distribution', 'delta', 'lsc', 'lscdf', 'rcdfm', 'mlr', | |
'ann', 'kalman', 'bcsd', 'bcsdm', 'qq_mapping', 'eQM', 'aQM', 'gQM', 'gpdQM'] | |
else: | |
methods = [method] | |
for method in methods: | |
os.makedirs(f'output/{method}', exist_ok=True) | |
os.makedirs(f'output/{method}/corrected', exist_ok=True) | |
os.makedirs(f'output/{method}/factors', exist_ok=True) | |
os.makedirs(f'output/{method}/metrics', exist_ok=True) | |
for year in range(2001, 2023): | |
# Get the data for this year | |
imerg_year_ds = imerg_ds.sel(time=imerg_ds['time.year'] == year) | |
cpc_year_ds = cpc_ds.sel(time=cpc_ds['time.year'] == year) | |
# Schedule the calculation of the corrected data for this year | |
corrected_ds = delayed(bias_correction)(imerg_year_ds, cpc_year_ds, method=method) | |
corrected_ds_list.append(corrected_ds) | |
# Compute the corrected data for all years in parallel | |
corrected_ds_list = dask.compute(*corrected_ds_list) | |
# Encoding for CF 1.8 | |
cf18 = {'precipitation': {'dtype': 'float32', 'scale_factor': 0.1, 'zlib': True, \ | |
'_FillValue': -9999, 'add_offset': 0, 'least_significant_digit': 1}} | |
""" | |
The `zip` function is used here to iterate over the three lists `methods`, `range(2001, 2023)`, and | |
`corrected_ds_list` simultaneously. The `*` operator is used to repeat the elements of the `methods` | |
list, so that there is one method for each year. The `*len(methods)` is used to repeat the elements | |
of the `range(2001, 2023)` list, so that there is one year for each method. This way, for each | |
iteration of the loop, the variable `method` will be set to the current method, the variable `year` | |
will be set to the current year, and the variable `corrected_ds` will be set to the corrected data | |
for that year and method. | |
""" | |
# Save the corrected data to a NetCDF file | |
for method, year, corrected_ds in zip(methods*(2022-2001+1), range(2001, 2023)*len(methods), corrected_ds_list): | |
corrected_ds.to_netcdf(f'output/{method}/corrected/corrected_{method}_{year}.nc', encoding=cf18) | |
dekad_factors = create_multiplying_factors(corrected_ds) | |
for i, factor in enumerate(dekad_factors): | |
factor.to_netcdf(f'output/{method}/factors/multiplying_factor_{method}_{i+1}.nc', encoding=cf18) | |
# Calculate the metrics for the corrected data | |
year_metrics = calculate_metrics(corrected_ds, cpc_year_ds) | |
# Add the metric values for this year to the dataframe | |
metrics = metrics.append(year_metrics) | |
# Save the metrics dataframe to a CSV file | |
if run_all_methods: | |
metrics.to_csv(f'output/{method}_metrics.csv') | |
else: | |
metrics.to_csv(f'output/{method}/metrics/{method}_metrics.csv') | |
if __name__ == '__main__': | |
main(run_all_methods=True) # run all methods | |
# main(method='rcdfm') # run rcdfm method |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment