Created
December 1, 2023 09:33
-
-
Save anttilipp/29f2cb56d99a054e1aa0fc5bcc1d8622 to your computer and use it in GitHub Desktop.
Example of a custom trajectory model run for aerosol layers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import sys | |
import xarray as xr | |
from datetime import datetime, timedelta | |
from scipy.interpolate import RegularGridInterpolator | |
import os | |
import time | |
import joblib | |
import pandas as pd | |
import dask | |
import warnings | |
from pyproj import CRS, Transformer | |
# | |
# Example of a custom trajectory model run for aerosol layers | |
# Antti Lipponen, [email protected] | |
# version 8 December 2022 | |
# | |
# The input file 'layers/CAL_layerINFO_20190526_20190608.txt' is a CSV file that contains aerosol | |
# layer information (start points of the trajectories) in the following columns: | |
# 'UTC (yyyy-mm-ddTHH:MM:SS)' (time in UTC) | |
# 'latitude' (latitude in degrees) | |
# 'longitude' (longitude in degrees) | |
# 'layer bottom (km)' (aerosol layer bottom in km) | |
# 'layer top (km)' (aerosol layer top in km) | |
df = pd.read_csv('layers/CAL_layerINFO_20190526_20190608.txt', parse_dates=['UTC (yyyy-mm-ddTHH:MM:SS)'], sep=', ', engine='python') | |
df = df.rename(columns={'UTC (yyyy-mm-ddTHH:MM:SS)': 'datetime'}) # rename 'UTC (yyyy-mm-ddTHH:MM:SS)' column to 'datetime' | |
# Parameters | |
deltat = 60 * 5 # s (60 * 5 = 5 minutes) | |
Ntimesteps = int(240 * 3600 / np.abs(deltat)) # simulation total 240 hours | |
EarthRadius = 6371000 # m | |
dask.config.set(**{'array.slicing.split_large_chunks': False}) | |
# Data directory contains the wind field information, | |
# in this case the directory contains the MERRA-2 3D instantaneous assimilated | |
# meteorological fields files for the time period of interest (e.g. MERRA2_400.inst3_3d_asm_Np.20190601.nc4, ...) | |
ds = xr.open_mfdataset('data/*.nc4', chunks={'time': 1}, parallel=True) | |
# Empty results lists | |
t0s = [] | |
lat0s = [] | |
lon0s = [] | |
h0s = [] | |
p0s = [] | |
this_id = 0 | |
ids = [] | |
ensemblemembers = [] | |
idsoffset = 0 # if offset needed for id numbering | |
# In the ensemble approach, add multiple trajectory starting points in a 20km x 20km grid around the starting point. | |
ensembleXoffset, ensembleYoffset = np.meshgrid(np.linspace(-10000, 10000, 5), np.linspace(-10000, 10000, 5)) | |
ensembleXoffset, ensembleYoffset = ensembleXoffset.ravel(), ensembleYoffset.ravel() | |
crsLatLon = CRS.from_proj4('+proj=latlon') | |
newdf = {'datetime': [], 'latitude': [], 'longitude': [], 'layer bottom (km)': [], 'layer top (km)': [], 'id': [], 'ensemblemembers': []} | |
for idx, r in df.iterrows(): | |
thiscrs = CRS.from_proj4("+proj=tmerc +lat_0={lat0:f} +lon_0={lon0:f} +units=m".format(lat0=r['latitude'], lon0=r['longitude'])) | |
transformerTHIStoLATLON = Transformer.from_crs(thiscrs, crsLatLon) | |
layer_bottom = r['layer bottom (km)'] * 1000 | |
layer_top = r['layer top (km)'] * 1000 | |
h0 = np.arange(np.ceil(layer_bottom / 100.) * 100, layer_top, 100) | |
for ii in range(len(ensembleXoffset)): | |
offsetX, offsetY = ensembleXoffset[ii], ensembleYoffset[ii] | |
thislon, thislat = transformerTHIStoLATLON.transform(offsetX, offsetY) | |
newdf['datetime'].append(r['datetime']) | |
newdf['latitude'].append(thislat) | |
newdf['longitude'].append(thislon) | |
newdf['layer bottom (km)'].append(r['layer bottom (km)']) | |
newdf['layer top (km)'].append(r['layer top (km)']) | |
newdf['id'].append(idx + idsoffset) | |
newdf['ensemblemembers'].append(ii + 1) | |
df = pd.DataFrame(newdf) | |
for idx, r in df.iterrows(): | |
t0 = r['datetime'] | |
lat, lon = r['latitude'], r['longitude'] | |
layer_bottom = r['layer bottom (km)'] * 1000 | |
layer_top = r['layer top (km)'] * 1000 | |
h0 = np.arange(np.ceil(layer_bottom / 100.) * 100, layer_top, 100) | |
h0s.append(h0) | |
init_PS = ds.PS.interp(lat=lat, lon=lon, time=t0).values | |
init_H = ds.H.interp(lat=lat, lon=lon, time=t0).values | |
init_H[np.isnan(init_H)] = np.nanmin(init_H) | |
init_PL = ds.lev.values | |
init_parcel_p = np.interp([h0], init_H, init_PL) | |
p0s.append(init_parcel_p.ravel()) | |
for ii in range(len(h0)): | |
ids.append(r['id']) | |
ensemblemembers.append(r['ensemblemembers']) | |
t0s.append(t0) | |
lat0s.append(lat) | |
lon0s.append(lon) | |
this_id += 1 | |
t0s = np.array(t0s) | |
lat0s = np.array(lat0s) | |
lon0s = np.array(lon0s) | |
h0s = np.concatenate(h0s) | |
p0s = np.concatenate(p0s) | |
Ntrajectories = len(t0s) | |
MAXt = datetime.utcfromtimestamp(np.ceil(np.max(t0s).timestamp() / 3600) * 3600) | |
MINt = datetime.utcfromtimestamp(np.ceil(np.min(t0s).timestamp() / 3600) * 3600) + timedelta(seconds=deltat * Ntimesteps) | |
Ntotaltimesteps = int((MAXt - MINt).total_seconds() / np.abs(deltat)) | |
this_t = MAXt | |
print('===========================') | |
print('Ntrajectories', Ntrajectories) | |
print('Ntotaltimesteps', Ntotaltimesteps) | |
print('===========================') | |
lat = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories)) | |
lon = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories)) | |
h = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories)) | |
p = np.nan * np.zeros((2 + Ntotaltimesteps, Ntrajectories)) | |
times = [] | |
ii = 1 | |
while this_t >= MINt: | |
# add new trajectories | |
MASK = np.logical_and.reduce((t0s > this_t, t0s <= this_t - timedelta(seconds=deltat))) | |
if MASK.sum() == 0: | |
this_t += timedelta(seconds=deltat) | |
ii += 1 | |
continue | |
lat[ii - 1, MASK] = lat0s[MASK] | |
lon[ii - 1, MASK] = lon0s[MASK] | |
p[ii - 1, MASK] = p0s[MASK] | |
h[ii - 1, MASK] = h0s[MASK] | |
this_t += timedelta(seconds=deltat) | |
ii += 1 | |
this_t = MAXt | |
ii = 1 | |
warnings.filterwarnings("error") | |
intermediateloaded = False | |
lev_values = None | |
lon_values = None | |
lat_values = None | |
loadedData = None | |
os.makedirs('intermediatedata', exist_ok=True) | |
while this_t >= MINt: | |
timing0 = time.time() | |
if (not intermediateloaded) and (os.path.isfile('intermediatedata/trajectoryintermediate.joblib')): | |
print('Loading intermediate data') | |
intermediatedata = joblib.load('intermediatedata/trajectoryintermediate.joblib') | |
times = intermediatedata['times'] | |
this_t = intermediatedata['this_t'] | |
p = intermediatedata['p'] | |
lat = intermediatedata['lat'] | |
lon = intermediatedata['lon'] | |
h = intermediatedata['h'] | |
ii = intermediatedata['ii'] | |
intermediateloaded = True | |
times.append(this_t) | |
print(' ', ii + 1, '/', 2 + Ntotaltimesteps, ' ', this_t, flush=True) | |
MASK = t0s > this_t | |
if MASK.sum() == 0: | |
this_t += timedelta(seconds=deltat) | |
ii += 1 | |
continue | |
if lev_values is None: | |
lev_values = ds.lev.values | |
lon_values = ds.lon.values | |
lat_values = ds.lat.values | |
this_lat = lat[ii - 1, MASK] | |
this_lon = lon[ii - 1, MASK] | |
this_p = np.clip(p[ii - 1, MASK], a_min=lev_values.min(), a_max=lev_values.max()) | |
this_h = h[ii - 1, MASK] | |
if (loadedData is None) or (loadedData.time[0].values > np.datetime64(this_t) + np.timedelta64(deltat, 's')): | |
loadedData = ds.sel(time=slice(this_t - timedelta(seconds=48 * 3600), this_t + timedelta(seconds=3 * 3600))) | |
loadedU = loadedData.U.load() | |
loadedV = loadedData.V.load() | |
loadedOMEGA = loadedData.OMEGA.load() | |
t0 = time.time() | |
loadedH = loadedData.H.load() | |
this_U_t, this_U_t_plus1 = loadedU.interp(time=[this_t, this_t + timedelta(seconds=deltat)]).values | |
this_V_t, this_V_t_plus1 = loadedV.interp(time=[this_t, this_t + timedelta(seconds=deltat)]).values | |
this_OMEGA_t, this_OMEGA_t_plus1 = loadedOMEGA.interp(time=[this_t, this_t + timedelta(seconds=deltat)]).values / 100.0 | |
interpolator_t = RegularGridInterpolator((lev_values[::-1], lat_values, lon_values), np.stack((this_U_t[::-1, :, :], this_V_t[::-1, :, :], this_OMEGA_t[::-1, :, :]), axis=3)) | |
UVOMEGA_t = interpolator_t((this_p, this_lat, this_lon)) | |
U_t = UVOMEGA_t[:, 0] | |
V_t = UVOMEGA_t[:, 1] | |
OMEGA_t = UVOMEGA_t[:, 2] | |
nanidx = np.where(np.isnan(UVOMEGA_t).sum(axis=1) > 0)[0] | |
pressureleveladjustment = 0.995 | |
itercounter = 1 | |
if len(nanidx) > 0: | |
print(' Adjusting pressure t...', end='', flush=True) | |
while len(nanidx) > 0: | |
UVOMEGA_t[nanidx, :] = interpolator_t((this_p[nanidx] * pressureleveladjustment**itercounter, this_lat[nanidx], this_lon[nanidx])) | |
itercounter += 1 | |
nanidx = np.where(np.isnan(UVOMEGA_t).sum(axis=1) > 0)[0] | |
print('Done!') | |
deltaU_t = deltat * U_t | |
deltaV_t = deltat * V_t | |
deltaOMEGA_t = deltat * OMEGA_t | |
piRh = np.pi * (EarthRadius + this_h) | |
delta_lat_t = deltaV_t * 180 / piRh | |
delta_lon_t = deltaU_t * 180 / (piRh * np.sin(np.pi / 2 - np.deg2rad(this_lat))) | |
this_lat_tplus1 = this_lat + delta_lat_t | |
this_lon_tplus1 = this_lon + delta_lon_t | |
this_lon_tplus1[this_lon_tplus1 < -180] = this_lon_tplus1[this_lon_tplus1 < -180] + 360 | |
this_lon_tplus1[this_lon_tplus1 > 180] = this_lon_tplus1[this_lon_tplus1 > 180] - 360 | |
this_lon_tplus1 = np.clip(this_lon_tplus1, a_min=lon_values.min(), a_max=lon_values.max()) | |
this_lat_tplus1[this_lat_tplus1 < -90] = -180 - this_lat_tplus1[this_lat_tplus1 < -90] | |
this_lat_tplus1[this_lat_tplus1 > 90] = 180 - this_lat_tplus1[this_lat_tplus1 > 90] | |
# now compute delta_lat_tplus1, delta_lon_tplus1, deltaOMEGA_tplus1 | |
interpolator_tplus1 = RegularGridInterpolator((lev_values[::-1], lat_values, lon_values), np.stack((this_U_t_plus1[::-1, :, :], this_V_t_plus1[::-1, :, :], this_OMEGA_t_plus1[::-1, :, :]), axis=3)) | |
try: | |
UVOMEGA_tplus1 = interpolator_tplus1((np.clip(this_p + deltaOMEGA_t, a_min=lev_values.min(), a_max=lev_values.max()), this_lat_tplus1, this_lon_tplus1)) | |
except Exception: | |
sys.exit(1) | |
U_tplus1 = UVOMEGA_tplus1[:, 0] | |
V_tplus1 = UVOMEGA_tplus1[:, 1] | |
OMEGA_tplus1 = UVOMEGA_tplus1[:, 2] | |
nanidx = np.where(np.isnan(UVOMEGA_tplus1).sum(axis=1) > 0)[0] | |
pressureleveladjustment = 0.995 | |
itercounter = 1 | |
if len(nanidx) > 0: | |
print(' Adjusting pressure t+1...', end='', flush=True) | |
while len(nanidx) > 0: | |
try: | |
UVOMEGA_tplus1[nanidx, :] = interpolator_tplus1((np.clip(this_p + deltaOMEGA_t, a_min=lev_values.min(), a_max=lev_values.max())[nanidx] * pressureleveladjustment**itercounter, (this_lat_tplus1)[nanidx], (this_lon_tplus1)[nanidx])) | |
except Exception: | |
sys.exit(2) | |
itercounter += 1 | |
nanidx = np.where(np.isnan(UVOMEGA_tplus1).sum(axis=1) > 0)[0] | |
print('Done!') | |
deltaU_tplus1 = deltat * U_tplus1 | |
deltaV_tplus1 = deltat * V_tplus1 | |
deltaOMEGA_tplus1 = deltat * OMEGA_tplus1 | |
loadedH_tplus = loadedH.interp(time=this_t + timedelta(seconds=deltat)) | |
H = loadedH_tplus.interp(lat=('dimX', this_lat_tplus1), lon=('dimX', this_lon_tplus1)).values | |
jjs = np.where(MASK)[0] | |
this_h_tplus1 = [] | |
for kk in range(H.shape[1]): | |
this_H = H[:, kk] | |
try: | |
this_H[np.isnan(this_H)] = np.nanmin(this_H) | |
except Exception: | |
sys.exit(3) | |
this_h_tplus1.append(np.interp([this_p[kk] + deltaOMEGA_t[kk]][0], init_PL[::-1], this_H[::-1])) | |
piRh = np.pi * (EarthRadius + np.array(this_h_tplus1)) | |
delta_lat_tplus1 = deltaV_tplus1 * 180 / piRh | |
delta_lon_tplus1 = deltaU_tplus1 * 180 / (piRh * np.sin(np.pi / 2 - np.deg2rad(this_lat_tplus1))) | |
delta_lat = 0.5 * (delta_lat_t + delta_lat_tplus1) | |
delta_lon = 0.5 * (delta_lon_t + delta_lon_tplus1) | |
deltaOMEGA = 0.5 * (deltaOMEGA_t + deltaOMEGA_tplus1) | |
lat[ii, MASK] = this_lat + delta_lat | |
lon[ii, MASK] = this_lon + delta_lon | |
lon[ii, lon[ii, :] < -180] = lon[ii, lon[ii, :] < -180] + 360 | |
lon[ii, lon[ii, :] > 180] = lon[ii, lon[ii, :] > 180] - 360 | |
lon[ii, :] = np.clip(lon[ii, :], a_min=lon_values.min(), a_max=lon_values.max()) | |
lat[ii, lat[ii, :] < -90] = -180 - lat[ii, lat[ii, :] < -90] | |
lat[ii, lat[ii, :] > 90] = 180 - lat[ii, lat[ii, :] > 90] | |
p[ii, MASK] = p[ii - 1, MASK] + deltaOMEGA | |
H = loadedH_tplus.interp(lat=('dimX', lat[ii, MASK]), lon=('dimX', lon[ii, MASK])).values | |
jjs = np.where(MASK)[0] | |
for kk in range(H.shape[1]): | |
this_H = H[:, kk] | |
try: | |
this_H[np.isnan(this_H)] = np.nanmin(this_H) | |
except Exception: | |
sys.exit(4) | |
h[ii, jjs[kk]] = np.interp([p[ii, jjs[kk]]][0], init_PL[::-1], this_H[::-1]) | |
if np.isnan(h[ii, MASK]).sum() > 0: | |
sys.exit(5) | |
this_t += timedelta(seconds=deltat) | |
ii += 1 | |
if (ii % 2000) == 0: | |
# Save intermediate data every 2000th time step | |
print('Saving intermediate data') | |
intermediatedata = {'lat': lat, 'lon': lon, 'p': p, 'h': h, 'times': times, 'this_t': this_t, 'ii': ii} | |
joblib.dump(intermediatedata, 'intermediatedata/trajectoryintermediate.joblib') | |
intermediatedata = None | |
timing1 = time.time() | |
print(' Time elapsed: {:.1f}s'.format(timing1 - timing0)) | |
# Save output data to a Joblib dump file | |
data = {'lat': lat, 'lon': lon, 'p': p, 'h': h, 'times': times, 'ids': ids, 'ensemblemembers': ensemblemembers} | |
joblib.dump(data, 'trajectorydata.joblib') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment