Created
June 8, 2024 06:59
-
-
Save bennyistanto/ae336fb861093192a593c6e842550fef to your computer and use it in GitHub Desktop.
Generating IMERG 2-day, 3-day, 4-day, and 5-day rolling accumulations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import xarray as xr | |
import numpy as np | |
import os | |
import glob | |
from tqdm import tqdm | |
import pandas as pd | |
# Define the rolling day output directories | |
rolling_days = [2, 3, 4, 5] | |
input_dir = "/mnt/d/temp/imerg/data/fdd_1day" | |
combined_ds_dir = "/mnt/d/temp/imerg/data/combined_ds" | |
# Check if input folder exists | |
if not os.path.exists(input_dir): | |
print(f"Input folder does not exist: {input_dir}") | |
exit(1) | |
# Create the combined dataset directory if it doesn't exist | |
os.makedirs(combined_ds_dir, exist_ok=True) | |
# Global variable to store user's choice | |
user_choice = None | |
def set_user_decision(): | |
"""Prompt user for decision on existing files and store it globally.""" | |
global user_choice | |
if user_choice is None: | |
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper() | |
while decision not in ['R', 'S', 'A']: | |
print("Invalid choice. Please choose again.") | |
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper() | |
user_choice = decision | |
# Create a list of files | |
file_list = sorted(glob.glob(os.path.join(input_dir, f"wld_cli_imerg_fdd_1day_*.nc4"))) | |
# Group files by year | |
files_by_year = {} | |
for file_path in file_list: | |
date_str = file_path.split('_')[-1].split('.')[0] | |
year = date_str[:4] | |
if year not in files_by_year: | |
files_by_year[year] = [] | |
files_by_year[year].append(file_path) | |
# Process each year separately and create combined dataset for each year | |
with tqdm(total=len(files_by_year), desc="Load the input data and process by years") as year_pbar: | |
for year, year_files in files_by_year.items(): | |
combined_ds_path = os.path.join(combined_ds_dir, f"combined_{year}.nc4") | |
if os.path.exists(combined_ds_path): | |
print(f"Combined dataset already exists for {year}. Skipping combination step.") | |
continue | |
else: | |
with tqdm(total=len(year_files), desc=f"Combining datasets for {year}", leave=False) as combine_pbar: | |
datasets = [xr.open_dataset(f) for f in year_files] | |
combined_year_ds = xr.concat(datasets, dim='time') | |
combine_pbar.update(len(year_files)) | |
# Add extra days from the next year for rolling calculation | |
next_year = str(int(year) + 1) | |
if next_year in files_by_year: | |
next_year_files = files_by_year[next_year] | |
extra_days_files = next_year_files[:max(rolling_days)] | |
extra_datasets = [xr.open_dataset(f) for f in extra_days_files] | |
extra_combined_ds = xr.concat(extra_datasets, dim='time') | |
combined_year_ds = xr.concat([combined_year_ds, extra_combined_ds], dim='time') | |
for ds in extra_datasets: | |
ds.close() | |
combined_year_ds.to_netcdf(combined_ds_path) | |
combined_year_ds.close() | |
for ds in datasets: | |
ds.close() | |
print(f"Combined dataset saved to {combined_ds_path}") | |
year_pbar.update(1) | |
# Load and process each combined dataset by year | |
with tqdm(total=len(files_by_year), desc="Calculating rolling sums and saving files") as year_pbar: | |
for year in files_by_year.keys(): | |
combined_ds_path = os.path.join(combined_ds_dir, f"combined_{year}.nc4") | |
print(f"Loading combined dataset for {year}") | |
combined_ds = xr.open_dataset(combined_ds_path) | |
for days in rolling_days: | |
output_dir = f"/mnt/d/temp/imerg/data/fdd_{days}day" | |
os.makedirs(output_dir, exist_ok=True) | |
print(f"Calculating rolling sum for {days}-day period for {year}") | |
# Calculate rolling accumulation | |
rolling_sum = combined_ds.rolling(time=days).sum() | |
# Save each rolling accumulation as a separate file for each date | |
with tqdm(total=len(rolling_sum.time.values), desc=f"Saving {days}-day files for {year}", leave=False) as save_pbar: | |
for date in rolling_sum.time.values: | |
date_str = pd.Timestamp(date).strftime('%Y%m%d') | |
output_filename = f"wld_cli_imerg_fdd_{days}day_{date_str}.nc4" | |
output_filepath = os.path.join(output_dir, output_filename) | |
if os.path.exists(output_filepath): | |
if user_choice is None: | |
set_user_decision() | |
if user_choice == 'S': | |
print(f"Skipping existing file: {output_filepath}") | |
continue # Skip to the next file | |
elif user_choice == 'A': | |
print("Aborting process.") | |
exit(0) # Exit the script | |
elif user_choice == 'R': | |
pass # Replace the file | |
try: | |
ds = rolling_sum.sel(time=date) | |
ds = ds.expand_dims(time=[date]) # Add the time dimension with the correct date | |
ds['time'] = [pd.Timestamp(date).to_numpy()] # Ensure time is set correctly as a coordinate | |
# Ensure CF conventions and compression | |
encoding = {var: {'zlib': True, 'complevel': 5} for var in ds.data_vars} | |
ds.attrs['Conventions'] = 'CF-1.8' | |
ds.to_netcdf(output_filepath, encoding=encoding) | |
save_pbar.update(1) | |
except Exception as e: | |
print(f"Error saving file {output_filepath}: {e}") | |
print(f"{days}-day accumulation processing complete for {year}.") | |
year_pbar.update(1) | |
print("All rolling accumulations processed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment