Skip to content

Instantly share code, notes, and snippets.

@bennyistanto
Created June 8, 2024 06:59
Show Gist options
  • Save bennyistanto/ae336fb861093192a593c6e842550fef to your computer and use it in GitHub Desktop.
Save bennyistanto/ae336fb861093192a593c6e842550fef to your computer and use it in GitHub Desktop.
Generating IMERG 2-day, 3-day, 4-day, and 5-day rolling accumulations
import xarray as xr
import numpy as np
import os
import glob
from tqdm import tqdm
import pandas as pd
# Define the rolling day output directories
rolling_days = [2, 3, 4, 5]
input_dir = "/mnt/d/temp/imerg/data/fdd_1day"
combined_ds_dir = "/mnt/d/temp/imerg/data/combined_ds"
# Check if input folder exists
if not os.path.exists(input_dir):
print(f"Input folder does not exist: {input_dir}")
exit(1)
# Create the combined dataset directory if it doesn't exist
os.makedirs(combined_ds_dir, exist_ok=True)
# Global variable to store user's choice
user_choice = None
def set_user_decision():
"""Prompt user for decision on existing files and store it globally."""
global user_choice
if user_choice is None:
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
while decision not in ['R', 'S', 'A']:
print("Invalid choice. Please choose again.")
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
user_choice = decision
# Create a list of files
file_list = sorted(glob.glob(os.path.join(input_dir, f"wld_cli_imerg_fdd_1day_*.nc4")))
# Group files by year
files_by_year = {}
for file_path in file_list:
date_str = file_path.split('_')[-1].split('.')[0]
year = date_str[:4]
if year not in files_by_year:
files_by_year[year] = []
files_by_year[year].append(file_path)
# Process each year separately and create combined dataset for each year
with tqdm(total=len(files_by_year), desc="Load the input data and process by years") as year_pbar:
for year, year_files in files_by_year.items():
combined_ds_path = os.path.join(combined_ds_dir, f"combined_{year}.nc4")
if os.path.exists(combined_ds_path):
print(f"Combined dataset already exists for {year}. Skipping combination step.")
continue
else:
with tqdm(total=len(year_files), desc=f"Combining datasets for {year}", leave=False) as combine_pbar:
datasets = [xr.open_dataset(f) for f in year_files]
combined_year_ds = xr.concat(datasets, dim='time')
combine_pbar.update(len(year_files))
# Add extra days from the next year for rolling calculation
next_year = str(int(year) + 1)
if next_year in files_by_year:
next_year_files = files_by_year[next_year]
extra_days_files = next_year_files[:max(rolling_days)]
extra_datasets = [xr.open_dataset(f) for f in extra_days_files]
extra_combined_ds = xr.concat(extra_datasets, dim='time')
combined_year_ds = xr.concat([combined_year_ds, extra_combined_ds], dim='time')
for ds in extra_datasets:
ds.close()
combined_year_ds.to_netcdf(combined_ds_path)
combined_year_ds.close()
for ds in datasets:
ds.close()
print(f"Combined dataset saved to {combined_ds_path}")
year_pbar.update(1)
# Load and process each combined dataset by year
with tqdm(total=len(files_by_year), desc="Calculating rolling sums and saving files") as year_pbar:
for year in files_by_year.keys():
combined_ds_path = os.path.join(combined_ds_dir, f"combined_{year}.nc4")
print(f"Loading combined dataset for {year}")
combined_ds = xr.open_dataset(combined_ds_path)
for days in rolling_days:
output_dir = f"/mnt/d/temp/imerg/data/fdd_{days}day"
os.makedirs(output_dir, exist_ok=True)
print(f"Calculating rolling sum for {days}-day period for {year}")
# Calculate rolling accumulation
rolling_sum = combined_ds.rolling(time=days).sum()
# Save each rolling accumulation as a separate file for each date
with tqdm(total=len(rolling_sum.time.values), desc=f"Saving {days}-day files for {year}", leave=False) as save_pbar:
for date in rolling_sum.time.values:
date_str = pd.Timestamp(date).strftime('%Y%m%d')
output_filename = f"wld_cli_imerg_fdd_{days}day_{date_str}.nc4"
output_filepath = os.path.join(output_dir, output_filename)
if os.path.exists(output_filepath):
if user_choice is None:
set_user_decision()
if user_choice == 'S':
print(f"Skipping existing file: {output_filepath}")
continue # Skip to the next file
elif user_choice == 'A':
print("Aborting process.")
exit(0) # Exit the script
elif user_choice == 'R':
pass # Replace the file
try:
ds = rolling_sum.sel(time=date)
ds = ds.expand_dims(time=[date]) # Add the time dimension with the correct date
ds['time'] = [pd.Timestamp(date).to_numpy()] # Ensure time is set correctly as a coordinate
# Ensure CF conventions and compression
encoding = {var: {'zlib': True, 'complevel': 5} for var in ds.data_vars}
ds.attrs['Conventions'] = 'CF-1.8'
ds.to_netcdf(output_filepath, encoding=encoding)
save_pbar.update(1)
except Exception as e:
print(f"Error saving file {output_filepath}: {e}")
print(f"{days}-day accumulation processing complete for {year}.")
year_pbar.update(1)
print("All rolling accumulations processed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment