Skip to content

Instantly share code, notes, and snippets.

@anttilipp
Created December 1, 2023 09:33
Show Gist options
  • Save anttilipp/29f2cb56d99a054e1aa0fc5bcc1d8622 to your computer and use it in GitHub Desktop.
Save anttilipp/29f2cb56d99a054e1aa0fc5bcc1d8622 to your computer and use it in GitHub Desktop.
Example of a custom trajectory model run for aerosol layers
import numpy as np
import sys
import xarray as xr
from datetime import datetime, timedelta
from scipy.interpolate import RegularGridInterpolator
import os
import time
import joblib
import pandas as pd
import dask
import warnings
from pyproj import CRS, Transformer
#
# Example of a custom trajectory model run for aerosol layers
# Antti Lipponen, [email protected]
# version 8 December 2022
#
# The input file 'layers/CAL_layerINFO_20190526_20190608.txt' is a CSV file that contains aerosol
# layer information (start points of the trajectories) in the following columns:
# 'UTC (yyyy-mm-ddTHH:MM:SS)' (time in UTC)
# 'latitude' (latitude in degrees)
# 'longitude' (longitude in degrees)
# 'layer bottom (km)' (aerosol layer bottom in km)
# 'layer top (km)' (aerosol layer top in km)
df = pd.read_csv('layers/CAL_layerINFO_20190526_20190608.txt', parse_dates=['UTC (yyyy-mm-ddTHH:MM:SS)'], sep=', ', engine='python')
df = df.rename(columns={'UTC (yyyy-mm-ddTHH:MM:SS)': 'datetime'}) # rename 'UTC (yyyy-mm-ddTHH:MM:SS)' column to 'datetime'
# Parameters
deltat = 60 * 5 # s (60 * 5 = 5 minutes)
Ntimesteps = int(240 * 3600 / np.abs(deltat)) # simulation total 240 hours
EarthRadius = 6371000 # m
dask.config.set(**{'array.slicing.split_large_chunks': False})
# Data directory contains the wind field information,
# in this case the directory contains the MERRA-2 3D instantaneous assimilated
# meteorological fields files for the time period of interest (e.g. MERRA2_400.inst3_3d_asm_Np.20190601.nc4, ...)
ds = xr.open_mfdataset('data/*.nc4', chunks={'time': 1}, parallel=True)
# Empty results lists
t0s = []
lat0s = []
lon0s = []
h0s = []
p0s = []
this_id = 0
ids = []
ensemblemembers = []
idsoffset = 0 # if offset needed for id numbering
# In the ensemble approach, add multiple trajectory starting points in a 20km x 20km grid around the starting point.
ensembleXoffset, ensembleYoffset = np.meshgrid(np.linspace(-10000, 10000, 5), np.linspace(-10000, 10000, 5))
ensembleXoffset, ensembleYoffset = ensembleXoffset.ravel(), ensembleYoffset.ravel()
crsLatLon = CRS.from_proj4('+proj=latlon')
newdf = {'datetime': [], 'latitude': [], 'longitude': [], 'layer bottom (km)': [], 'layer top (km)': [], 'id': [], 'ensemblemembers': []}
for idx, r in df.iterrows():
thiscrs = CRS.from_proj4("+proj=tmerc +lat_0={lat0:f} +lon_0={lon0:f} +units=m".format(lat0=r['latitude'], lon0=r['longitude']))
transformerTHIStoLATLON = Transformer.from_crs(thiscrs, crsLatLon)
layer_bottom = r['layer bottom (km)'] * 1000
layer_top = r['layer top (km)'] * 1000
h0 = np.arange(np.ceil(layer_bottom / 100.) * 100, layer_top, 100)
for ii in range(len(ensembleXoffset)):
offsetX, offsetY = ensembleXoffset[ii], ensembleYoffset[ii]
thislon, thislat = transformerTHIStoLATLON.transform(offsetX, offsetY)
newdf['datetime'].append(r['datetime'])
newdf['latitude'].append(thislat)
newdf['longitude'].append(thislon)
newdf['layer bottom (km)'].append(r['layer bottom (km)'])
newdf['layer top (km)'].append(r['layer top (km)'])
newdf['id'].append(idx + idsoffset)
newdf['ensemblemembers'].append(ii + 1)
df = pd.DataFrame(newdf)
for idx, r in df.iterrows():
t0 = r['datetime']
lat, lon = r['latitude'], r['longitude']
layer_bottom = r['layer bottom (km)'] * 1000
layer_top = r['layer top (km)'] * 1000
h0 = np.arange(np.ceil(layer_bottom / 100.) * 100, layer_top, 100)
h0s.append(h0)
init_PS = ds.PS.interp(lat=lat, lon=lon, time=t0).values
init_H = ds.H.interp(lat=lat, lon=lon, time=t0).values
init_H[np.isnan(init_H)] = np.nanmin(init_H)
init_PL = ds.lev.values
init_parcel_p = np.interp([h0], init_H, init_PL)
p0s.append(init_parcel_p.ravel())
for ii in range(len(h0)):
ids.append(r['id'])
ensemblemembers.append(r['ensemblemembers'])
t0s.append(t0)
lat0s.append(lat)
lon0s.append(lon)
this_id += 1
t0s = np.array(t0s)
lat0s = np.array(lat0s)
lon0s = np.array(lon0s)
h0s = np.concatenate(h0s)
p0s = np.concatenate(p0s)
Ntrajectories = len(t0s)
MAXt = datetime.utcfromtimestamp(np.ceil(np.max(t0s).timestamp() / 3600) * 3600)
MINt = datetime.utcfromtimestamp(np.ceil(np.min(t0s).timestamp() / 3600) * 3600) + timedelta(seconds=deltat * Ntimesteps)
Ntotaltimesteps = int((MAXt - MINt).total_seconds() / np.abs(deltat))
this_t = MAXt
print('===========================')
print('Ntrajectories', Ntrajectories)
print('Ntotaltimesteps', Ntotaltimesteps)
print('===========================')
lat = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories))
lon = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories))
h = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories))
p = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories))
times = []
ii = 1
while this_t >= MINt:
# add new trajectories
MASK = np.logical_and.reduce((t0s > this_t, t0s <= this_t - timedelta(seconds=deltat)))
if MASK.sum() == 0:
this_t += timedelta(seconds=deltat)
ii += 1
continue
lat[ii - 1, MASK] = lat0s[MASK]
lon[ii - 1, MASK] = lon0s[MASK]
p[ii - 1, MASK] = p0s[MASK]
h[ii - 1, MASK] = h0s[MASK]
this_t += timedelta(seconds=deltat)
ii += 1
this_t = MAXt
ii = 1
warnings.filterwarnings("error")
intermediateloaded = False
lev_values = None
lon_values = None
lat_values = None
loadedData = None
os.makedirs('intermediatedata', exist_ok=True)
while this_t >= MINt:
timing0 = time.time()
if (not intermediateloaded) and (os.path.isfile('intermediatedata/trajectoryintermediate.joblib')):
print('Loading intermediate data')
intermediatedata = joblib.load('intermediatedata/trajectoryintermediate.joblib')
times = intermediatedata['times']
this_t = intermediatedata['this_t']
p = intermediatedata['p']
lat = intermediatedata['lat']
lon = intermediatedata['lon']
h = intermediatedata['h']
ii = intermediatedata['ii']
intermediateloaded = True
times.append(this_t)
print(' ', ii + 1, '/', 2 + Ntotaltimesteps, ' ', this_t, flush=True)
MASK = t0s > this_t
if MASK.sum() == 0:
this_t += timedelta(seconds=deltat)
ii += 1
continue
if lev_values is None:
lev_values = ds.lev.values
lon_values = ds.lon.values
lat_values = ds.lat.values
this_lat = lat[ii - 1, MASK]
this_lon = lon[ii - 1, MASK]
this_p = np.clip(p[ii - 1, MASK], a_min=lev_values.min(), a_max=lev_values.max())
this_h = h[ii - 1, MASK]
if (loadedData is None) or (loadedData.time[0].values > np.datetime64(this_t) + np.timedelta64(deltat, 's')):
loadedData = ds.sel(time=slice(this_t - timedelta(seconds=48 * 3600), this_t + timedelta(seconds=3 * 3600)))
loadedU = loadedData.U.load()
loadedV = loadedData.V.load()
loadedOMEGA = loadedData.OMEGA.load()
t0 = time.time()
loadedH = loadedData.H.load()
this_U_t, this_U_t_plus1 = loadedU.interp(time=[this_t, this_t + timedelta(seconds=deltat)]).values
this_V_t, this_V_t_plus1 = loadedV.interp(time=[this_t, this_t + timedelta(seconds=deltat)]).values
this_OMEGA_t, this_OMEGA_t_plus1 = loadedOMEGA.interp(time=[this_t, this_t + timedelta(seconds=deltat)]).values / 100.0
interpolator_t = RegularGridInterpolator((lev_values[::-1], lat_values, lon_values), np.stack((this_U_t[::-1, :, :], this_V_t[::-1, :, :], this_OMEGA_t[::-1, :, :]), axis=3))
UVOMEGA_t = interpolator_t((this_p, this_lat, this_lon))
U_t = UVOMEGA_t[:, 0]
V_t = UVOMEGA_t[:, 1]
OMEGA_t = UVOMEGA_t[:, 2]
nanidx = np.where(np.isnan(UVOMEGA_t).sum(axis=1) > 0)[0]
pressureleveladjustment = 0.995
itercounter = 1
if len(nanidx) > 0:
print(' Adjusting pressure t...', end='', flush=True)
while len(nanidx) > 0:
UVOMEGA_t[nanidx, :] = interpolator_t((this_p[nanidx] * pressureleveladjustment**itercounter, this_lat[nanidx], this_lon[nanidx]))
itercounter += 1
nanidx = np.where(np.isnan(UVOMEGA_t).sum(axis=1) > 0)[0]
print('Done!')
deltaU_t = deltat * U_t
deltaV_t = deltat * V_t
deltaOMEGA_t = deltat * OMEGA_t
piRh = np.pi * (EarthRadius + this_h)
delta_lat_t = deltaV_t * 180 / piRh
delta_lon_t = deltaU_t * 180 / (piRh * np.sin(np.pi / 2 - np.deg2rad(this_lat)))
this_lat_tplus1 = this_lat + delta_lat_t
this_lon_tplus1 = this_lon + delta_lon_t
this_lon_tplus1[this_lon_tplus1 < -180] = this_lon_tplus1[this_lon_tplus1 < -180] + 360
this_lon_tplus1[this_lon_tplus1 > 180] = this_lon_tplus1[this_lon_tplus1 > 180] - 360
this_lon_tplus1 = np.clip(this_lon_tplus1, a_min=lon_values.min(), a_max=lon_values.max())
this_lat_tplus1[this_lat_tplus1 < -90] = -180 - this_lat_tplus1[this_lat_tplus1 < -90]
this_lat_tplus1[this_lat_tplus1 > 90] = 180 - this_lat_tplus1[this_lat_tplus1 > 90]
# now compute delta_lat_tplus1, delta_lon_tplus1, deltaOMEGA_tplus1
interpolator_tplus1 = RegularGridInterpolator((lev_values[::-1], lat_values, lon_values), np.stack((this_U_t_plus1[::-1, :, :], this_V_t_plus1[::-1, :, :], this_OMEGA_t_plus1[::-1, :, :]), axis=3))
try:
UVOMEGA_tplus1 = interpolator_tplus1((np.clip(this_p + deltaOMEGA_t, a_min=lev_values.min(), a_max=lev_values.max()), this_lat_tplus1, this_lon_tplus1))
except Exception:
sys.exit(1)
U_tplus1 = UVOMEGA_tplus1[:, 0]
V_tplus1 = UVOMEGA_tplus1[:, 1]
OMEGA_tplus1 = UVOMEGA_tplus1[:, 2]
nanidx = np.where(np.isnan(UVOMEGA_tplus1).sum(axis=1) > 0)[0]
pressureleveladjustment = 0.995
itercounter = 1
if len(nanidx) > 0:
print(' Adjusting pressure t+1...', end='', flush=True)
while len(nanidx) > 0:
try:
UVOMEGA_tplus1[nanidx, :] = interpolator_tplus1((np.clip(this_p + deltaOMEGA_t, a_min=lev_values.min(), a_max=lev_values.max())[nanidx] * pressureleveladjustment**itercounter, (this_lat_tplus1)[nanidx], (this_lon_tplus1)[nanidx]))
except Exception:
sys.exit(2)
itercounter += 1
nanidx = np.where(np.isnan(UVOMEGA_tplus1).sum(axis=1) > 0)[0]
print('Done!')
deltaU_tplus1 = deltat * U_tplus1
deltaV_tplus1 = deltat * V_tplus1
deltaOMEGA_tplus1 = deltat * OMEGA_tplus1
loadedH_tplus = loadedH.interp(time=this_t + timedelta(seconds=deltat))
H = loadedH_tplus.interp(lat=('dimX', this_lat_tplus1), lon=('dimX', this_lon_tplus1)).values
jjs = np.where(MASK)[0]
this_h_tplus1 = []
for kk in range(H.shape[1]):
this_H = H[:, kk]
try:
this_H[np.isnan(this_H)] = np.nanmin(this_H)
except Exception:
sys.exit(3)
this_h_tplus1.append(np.interp([this_p[kk] + deltaOMEGA_t[kk]][0], init_PL[::-1], this_H[::-1]))
piRh = np.pi * (EarthRadius + np.array(this_h_tplus1))
delta_lat_tplus1 = deltaV_tplus1 * 180 / piRh
delta_lon_tplus1 = deltaU_tplus1 * 180 / (piRh * np.sin(np.pi / 2 - np.deg2rad(this_lat_tplus1)))
delta_lat = 0.5 * (delta_lat_t + delta_lat_tplus1)
delta_lon = 0.5 * (delta_lon_t + delta_lon_tplus1)
deltaOMEGA = 0.5 * (deltaOMEGA_t + deltaOMEGA_tplus1)
lat[ii, MASK] = this_lat + delta_lat
lon[ii, MASK] = this_lon + delta_lon
lon[ii, lon[ii, :] < -180] = lon[ii, lon[ii, :] < -180] + 360
lon[ii, lon[ii, :] > 180] = lon[ii, lon[ii, :] > 180] - 360
lon[ii, :] = np.clip(lon[ii, :], a_min=lon_values.min(), a_max=lon_values.max())
lat[ii, lat[ii, :] < -90] = -180 - lat[ii, lat[ii, :] < -90]
lat[ii, lat[ii, :] > 90] = 180 - lat[ii, lat[ii, :] > 90]
p[ii, MASK] = p[ii - 1, MASK] + deltaOMEGA
H = loadedH_tplus.interp(lat=('dimX', lat[ii, MASK]), lon=('dimX', lon[ii, MASK])).values
jjs = np.where(MASK)[0]
for kk in range(H.shape[1]):
this_H = H[:, kk]
try:
this_H[np.isnan(this_H)] = np.nanmin(this_H)
except Exception:
sys.exit(4)
h[ii, jjs[kk]] = np.interp([p[ii, jjs[kk]]][0], init_PL[::-1], this_H[::-1])
if np.isnan(h[ii, MASK]).sum() > 0:
sys.exit(5)
this_t += timedelta(seconds=deltat)
ii += 1
if (ii % 2000) == 0:
# Save intermediate data every 2000th time step
print('Saving intermediate data')
intermediatedata = {'lat': lat, 'lon': lon, 'p': p, 'h': h, 'times': times, 'this_t': this_t, 'ii': ii}
joblib.dump(intermediatedata, 'intermediatedata/trajectoryintermediate.joblib')
intermediatedata = None
timing1 = time.time()
print(' Time elapsed: {:.1f}s'.format(timing1 - timing0))
# Save output data to a Joblib dump file
data = {'lat': lat, 'lon': lon, 'p': p, 'h': h, 'times': times, 'ids': ids, 'ensemblemembers': ensemblemembers}
joblib.dump(data, 'trajectorydata.joblib')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment