import numpy as np |
import matplotlib.pyplot as plt |
from scipy.stats import gamma, genpareto |
from sklearn.model_selection import KFold |
# Set the figure DPI to 300 |
plt.rcParams['figure.dpi'] = 300 |
def calculate_l_moments(data): |
""" |
Calculate L-moments for the given data. |
L-moments are analogous to conventional moments but can be more robust to outliers. |
Args: |
data (np.array): Input data array |
Returns: |
tuple: L-moments (l1, l2, l3, l4) and L-moment ratios (t2, t3, t4) |
""" |
sorted_data = np.sort(data) |
n = len(data) |
# Calculate probability weighted moments |
b0 = np.mean(sorted_data) |
b1 = np.sum((np.arange(1, n) * sorted_data[1:]) / (n * (n - 1))) |
b2 = np.sum((np.arange(1, n-1) * (np.arange(2, n) * sorted_data[2:])) / (n * (n - 1) * (n - 2))) |
b3 = np.sum((np.arange(1, n-2) * (np.arange(2, n-1) * (np.arange(3, n) * sorted_data[3:]))) / (n * (n - 1) * (n - 2) * (n - 3))) |
# Calculate L-moments |
l1 = b0 # L1 is the mean (measure of location) |
l2 = 2 * b1 - b0 # L2 is a measure of scale (analogous to standard deviation) |
l3 = 6 * b2 - 6 * b1 + b0 # L3 is a measure of skewness |
l4 = 20 * b3 - 30 * b2 + 12 * b1 - b0 # L4 is a measure of kurtosis |
# Calculate L-moment ratios |
t2 = l2 / l1 if l1 != 0 else np.nan # L-CV (coefficient of L-variation) |
t3 = l3 / l2 if l2 != 0 else np.nan # L-skewness |
t4 = l4 / l2 if l2 != 0 else np.nan # L-kurtosis |
return l1, l2, l3, l4, t2, t3, t4 |
def fit_gamma_with_l_moments(data): |
""" |
Fit a Gamma distribution to the data using L-moments method. |
Args: |
data (np.array): Input data array |
Returns: |
tuple: Fitted Gamma distribution parameters (shape, loc, scale) |
""" |
data = data[~np.isnan(data)] |
if len(data) == 0: |
return 1, 0, 1 # Default values: shape=1, loc=0, scale=1 |
l1, l2, _, _, t2, _, _ = calculate_l_moments(data) |
shape = (2 / t2) if t2 != 0 else 0.001 |
scale = l2 / shape if shape != 0 else l2 |
loc = 0 |
return shape, loc, scale |
def fit_generalized_pareto_distribution(data, threshold): |
""" |
Fit a Generalized Pareto Distribution (GPD) to the excesses above the threshold. |
Args: |
data (np.array): Input data array |
threshold (float): Threshold for defining excesses |
Returns: |
tuple: Fitted GPD parameters (shape, loc, scale) |
""" |
excesses = data[data > threshold] - threshold |
if len(excesses) < 10: # Arbitrary minimum number of points for GPD fitting |
return (0, 0, 1) # Return a default GPD with zero shape, zero location, and unit scale |
params = genpareto.fit(excesses) |
return params |
def cross_validate_gpd(data, threshold, n_splits=5): |
""" |
Perform cross-validation for GPD fitting. |
Args: |
data (np.array): Input data array |
threshold (float): Threshold for defining excesses |
n_splits (int): Number of splits for cross-validation |
Returns: |
tuple: Average GPD parameters from cross-validation (shape, loc, scale) |
""" |
excesses = data[data > threshold] - threshold |
if len(excesses) < n_splits: |
return fit_generalized_pareto_distribution(data, threshold) |
kf = KFold(n_splits=n_splits) |
params_list = [] |
for train_index, test_index in kf.split(excesses): |
train_data, test_data = excesses[train_index], excesses[test_index] |
params = genpareto.fit(train_data) |
params_list.append(params) |
shape_avg = np.mean([params[0] for params in params_list]) |
loc_avg = np.mean([params[1] for params in params_list]) |
scale_avg = np.mean([params[2] for params in params_list]) |
return shape_avg, loc_avg, scale_avg |
# Data generation |
np.random.seed(42) |
days = 30 |
# Generate base rainfall data |
base_rainfall = np.random.gamma(2, 10, days) |
# Create observed rainfall by adding random fluctuations |
observed_rainfall = base_rainfall + np.random.normal(0, 5, days) |
# Create satellite data with its own set of errors and biases |
satellite_rainfall = base_rainfall * np.random.uniform(0.8, 1.2, days) + np.random.normal(0, 7, days) |
# Add extreme values for 7 random days |
extreme_days = np.random.choice(days, 7, replace=False) |
extreme_values = np.random.uniform(50, 100, 7) |
observed_rainfall[extreme_days] += extreme_values |
satellite_rainfall[extreme_days] += extreme_values * np.random.uniform(0.9, 1.1, 7) |
# Ensure non-negative values |
observed_rainfall = np.maximum(observed_rainfall, 0) |
satellite_rainfall = np.maximum(satellite_rainfall, 0) |
# Fit distributions |
obs_gamma_params = fit_gamma_with_l_moments(observed_rainfall) |
sat_gamma_params = fit_gamma_with_l_moments(satellite_rainfall) |
threshold = np.percentile(observed_rainfall, 90) # Set threshold at 90th percentile |
obs_gpd_params = fit_generalized_pareto_distribution(observed_rainfall, threshold) |
sat_gpd_params = fit_generalized_pareto_distribution(satellite_rainfall, threshold) |
obs_gpd_cv_params = cross_validate_gpd(observed_rainfall, threshold) |
sat_gpd_cv_params = cross_validate_gpd(satellite_rainfall, threshold) |
# Print basic statistics and fitted parameters |
print("Observed rainfall stats:") |
print(f"Mean: {np.mean(observed_rainfall):.2f}, Std: {np.std(observed_rainfall):.2f}") |
print("Satellite rainfall stats:") |
print(f"Mean: {np.mean(satellite_rainfall):.2f}, Std: {np.std(satellite_rainfall):.2f}") |
print("\nFitted parameters:") |
print("Observed Gamma params:", obs_gamma_params) |
print("Satellite Gamma params:", sat_gamma_params) |
print("Observed GPD params:", obs_gpd_params) |
print("Satellite GPD params:", sat_gpd_params) |
print("Observed GPD CV params:", obs_gpd_cv_params) |
print("Satellite GPD CV params:", sat_gpd_cv_params) |
# Plotting in XKCD style |
plt.xkcd() |
# Create figure and adjust subplots |
fig, axs = plt.subplots(3, 2, figsize=(16, 20)) # Increased height to accommodate title |
fig.suptitle("Rainfall Distribution Analysis: Observed vs Satellite", fontsize=24, y=0.99) |
# Plot 1: Time series of rainfall |
plt.subplot(3, 2, 1) |
plt.plot(observed_rainfall, label='Observed', marker='o') |
plt.plot(satellite_rainfall, label='Satellite', marker='s') |
plt.title('Daily Rainfall Time Series') |
plt.xlabel('Day') |
plt.ylabel('Rainfall (mm)') |
plt.legend() |
# Plot 2: Histogram and fitted Gamma distribution |
plt.subplot(3, 2, 2) |
bins = np.linspace(0, max(observed_rainfall.max(), satellite_rainfall.max()), 30) |
plt.hist(observed_rainfall, bins=bins, alpha=0.5, density=True, label='Observed') |
plt.hist(satellite_rainfall, bins=bins, alpha=0.5, density=True, label='Satellite') |
x = np.linspace(0, max(observed_rainfall.max(), satellite_rainfall.max()), 100) |
plt.plot(x, gamma.pdf(x, *obs_gamma_params), 'r-', lw=2, label='Observed Gamma Fit') |
plt.plot(x, gamma.pdf(x, *sat_gamma_params), 'g-', lw=2, label='Satellite Gamma Fit') |
plt.title('Histogram and Gamma Distribution Fit') |
plt.xlabel('Rainfall (mm)') |
plt.ylabel('Density') |
plt.legend() |
# Plot 3: Q-Q plot for GPD |
plt.subplot(3, 2, 3) |
obs_excesses = observed_rainfall[observed_rainfall > threshold] - threshold |
sat_excesses = satellite_rainfall[satellite_rainfall > threshold] - threshold |
obs_theoretical_quantiles = genpareto.ppf(np.linspace(0.01, 0.99, len(obs_excesses)), *obs_gpd_params) |
sat_theoretical_quantiles = genpareto.ppf(np.linspace(0.01, 0.99, len(sat_excesses)), *sat_gpd_params) |
plt.scatter(np.sort(obs_excesses), obs_theoretical_quantiles, label='Observed') |
plt.scatter(np.sort(sat_excesses), sat_theoretical_quantiles, label='Satellite') |
plt.plot([0, max(obs_excesses.max(), sat_excesses.max())], [0, max(obs_excesses.max(), sat_excesses.max())], 'r--') |
plt.title('Q-Q Plot for GPD Fit') |
plt.xlabel('Empirical Quantiles') |
plt.ylabel('Theoretical Quantiles') |
plt.legend() |
# Plot 4: Comparison of GPD fits |
plt.subplot(3, 2, 4) |
x = np.linspace(0, max(obs_excesses.max(), sat_excesses.max()), 100) |
plt.plot(x, genpareto.pdf(x, *obs_gpd_params), 'r-', lw=2, label='Observed GPD') |
plt.plot(x, genpareto.pdf(x, *sat_gpd_params), 'g-', lw=2, label='Satellite GPD') |
plt.plot(x, genpareto.pdf(x, *obs_gpd_cv_params), 'r--', lw=2, label='Observed GPD (CV)') |
plt.plot(x, genpareto.pdf(x, *sat_gpd_cv_params), 'g--', lw=2, label='Satellite GPD (CV)') |
plt.title('GPD Fits Comparison') |
plt.xlabel('Excess Rainfall (mm)') |
plt.ylabel('Density') |
plt.legend() |
# Plot 5: Observed vs Satellite Rainfall |
plt.subplot(3, 2, 5) |
plt.scatter(observed_rainfall, satellite_rainfall) |
plt.plot([0, max(observed_rainfall.max(), satellite_rainfall.max())], |
[0, max(observed_rainfall.max(), satellite_rainfall.max())], 'r--') |
plt.title('Observed vs Satellite Rainfall') |
plt.xlabel('Observed Rainfall (mm)') |
plt.ylabel('Satellite Rainfall (mm)') |
# Plot 6: CDF of data and fitted Gamma distributions |
plt.subplot(3, 2, 6) |
x = np.linspace(0, max(observed_rainfall.max(), satellite_rainfall.max()), 100) |
plt.plot(x, gamma.cdf(x, *obs_gamma_params), 'r-', lw=2, label='Observed Gamma CDF') |
plt.plot(x, gamma.cdf(x, *sat_gamma_params), 'g-', lw=2, label='Satellite Gamma CDF') |
plt.step(np.sort(observed_rainfall), np.linspace(0, 1, len(observed_rainfall)), 'r:', label='Observed Empirical CDF') |
plt.step(np.sort(satellite_rainfall), np.linspace(0, 1, len(satellite_rainfall)), 'g:', label='Satellite Empirical CDF') |
plt.title('CDF Comparison') |
plt.xlabel('Rainfall (mm)') |
plt.ylabel('Cumulative Probability') |
plt.legend() |
plt.tight_layout() |
plt.subplots_adjust(top=0.93) # Adjust the top of the subplots to make room for the title |
plt.show() |