Created
February 8, 2022 21:25
-
-
Save brews/1a0f7decfc884eb3dbe70c2dbcff2c90 to your computer and use it in GitHub Desktop.
Rough Argo Workflow to load daily PRISM variables, clip to CA bounding box, and output to single Zarr store.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import xarray as xr | |
tmin_url = "~/Downloads/tmin-20220208/annual.zarr" | |
tmax_url = "~/Downloads/tmax-20220208/annual.zarr" | |
tmean_url = "~/Downloads/tmean-20220208/annual.zarr" | |
out_url = "gs://fakebucketname/prism-ca-20220208.zarr" | |
tmin = xr.open_zarr(tmin_url).drop("crs") | |
tmax = xr.open_zarr(tmax_url).drop("crs") | |
tmean = xr.open_zarr(tmean_url).drop("crs") | |
prism_ca = xr.merge([ | |
xr.open_zarr(tmin_url).rename({"Band1": "tmin"}).drop("crs"), | |
xr.open_zarr(tmax_url).rename({"Band1": "tmax"}).drop("crs"), | |
xr.open_zarr(tmean_url).rename({"Band1": "tmean"}).drop("crs") | |
]) | |
# Include CRS projection information in Dataset root attrs. | |
prism_ca.attrs = prism_ca.attrs | xr.open_zarr(tmin_url)['crs'].attrs | |
# import matplotlib.pyplot as plt | |
# prism_ca['tmin'].isel(time=slice(0, 3)).plot(col="time");plt.show() | |
prism_ca.to_zarr(out_url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: argoproj.io/v1alpha1 | |
kind: Workflow | |
metadata: | |
generateName: prism2zarr- | |
annotations: | |
workflows.argoproj.io/description: >- | |
Load daily PRISM variables, clip to CA bounding box, and output to | |
single Zarr store. | |
workflows.argoproj.io/tags: zarr,prism,util,bil,climate | |
workflows.argoproj.io/version: '>= 3.1.0' | |
spec: | |
entrypoint: main | |
arguments: | |
parameters: | |
- name: variable | |
value: "tmean" | |
- name: firstyear | |
value: 1999 | |
- name: lastyear | |
value: 2015 | |
templates: | |
- name: main | |
parallelism: 1 | |
inputs: | |
parameters: | |
- name: variable | |
- name: firstyear | |
- name: lastyear | |
dag: | |
tasks: | |
- name: get-prism-years | |
template: process-prism-year | |
arguments: | |
parameters: | |
- name: variable | |
value: "{{ inputs.parameters.variable }}" | |
- name: year | |
value: "{{ item }}" | |
withSequence: | |
start: "{{ inputs.parameters.firstyear }}" | |
end: "{{ inputs.parameters.lastyear }}" | |
- name: combine-years | |
template: combine-years | |
depends: get-prism-years | |
arguments: | |
artifacts: | |
- name: annuals | |
gcs: | |
key: "{{ workflow.uid }}/annual_netcdfs" | |
- name: process-prism-year | |
inputs: | |
parameters: | |
- name: variable | |
- name: year | |
dag: | |
tasks: | |
- name: download | |
template: download-prism | |
arguments: | |
parameters: | |
- name: variable | |
value: "{{ inputs.parameters.variable }}" | |
- name: year | |
value: "{{ inputs.parameters.year }}" | |
- name: convert | |
template: convert | |
depends: download | |
arguments: | |
artifacts: | |
- name: rawzips | |
from: "{{ tasks.download.outputs.artifacts.raws }}" | |
- name: aggregate-to-annual | |
template: aggregate-to-annual | |
depends: convert | |
arguments: | |
parameters: | |
- name: year | |
value: "{{ inputs.parameters.year }}" | |
artifacts: | |
- name: daily-netcdfs | |
from: "{{ tasks.convert.outputs.artifacts.netcdfs }}" | |
- name: download-prism | |
inputs: | |
parameters: | |
- name: variable | |
- name: year | |
outputs: | |
artifacts: | |
- name: raws | |
path: /tmp/prism | |
archive: | |
none: {} | |
container: | |
image: curlimages/curl:7.81.0 | |
command: [sh, -c] | |
args: ["curl ftp://ftp.prism.oregonstate.edu/daily/{{ inputs.parameters.variable }}/{{ inputs.parameters.year }}/PRISM_{{ inputs.parameters.variable }}_stable_4kmD2_{{ inputs.parameters.year }}[01-12][01-31]_bil.zip -O --create-dirs --output-dir /tmp/prism"] | |
resources: | |
requests: | |
memory: 0.1Gi | |
cpu: "100m" | |
limits: | |
memory: 0.1Gi | |
cpu: "500m" | |
activeDeadlineSeconds: 1800 | |
# Unzip and convert each daily BIL file to daily NetCDFs cropped to CA bounding box. | |
- name: convert | |
inputs: | |
artifacts: | |
- name: rawzips | |
path: /tmp/raws | |
outputs: | |
artifacts: | |
- name: netcdfs | |
path: /tmp/cropped | |
script: | |
# image: osgeo/gdal:alpine-normal-3.4.1 | |
image: osgeo/gdal:ubuntu-full-3.4.1 | |
command: [bash] | |
source: | | |
unzip '/tmp/raws/*_bil.zip' -d /tmp/raws | |
mkdir -p /tmp/converted | |
mkdir -p /tmp/cropped | |
for filename in /tmp/raws/*_bil.bil | |
do | |
bn="$(basename -- $filename)" | |
gdal_translate -if "EHdr" -of "netCDF" "${filename}" "/tmp/converted/${bn%%.*}.nc" | |
gdalwarp -te -124.5 32.4 -114.2 42.1 "/tmp/converted/${bn%%.*}.nc" "/tmp/cropped/${bn%%.*}.nc" | |
done | |
resources: | |
requests: | |
memory: 0.5Gi | |
cpu: "500m" | |
limits: | |
memory: 0.5Gi | |
cpu: "1000m" | |
activeDeadlineSeconds: 1800 | |
retryStrategy: | |
limit: 2 | |
# Combine all daily NetCDFs to one annual, with time dim from input | |
# NetCDF path names. | |
- name: aggregate-to-annual | |
inputs: | |
parameters: | |
- name: year | |
artifacts: | |
- name: daily-netcdfs | |
path: /tmp/daily_ncs | |
outputs: | |
artifacts: | |
- name: annual-nc | |
path: /tmp/annual.nc | |
archive: | |
none: {} | |
gcs: | |
key: "{{ workflow.uid }}/annual_netcdfs/{{ inputs.parameters.year }}.nc" | |
script: | |
image: us-central1-docker.pkg.dev/downscalecmip6/private/dodola:0.16.1 | |
command: [python] | |
source: | | |
from datetime import datetime | |
from pathlib import Path | |
import xarray as xr | |
INPUT_GLOB = "/tmp/daily_ncs/*.nc" | |
OUT_PATH="/tmp/annual.nc" | |
def datetime_from_prism_flname(p): | |
p = Path(p) | |
return datetime.strptime(p.stem.split("_")[-2], "%Y%m%d") | |
def preprocess(ds): | |
"""Preprocess input ds before concatenate""" | |
fl_path = ds.encoding["source"] | |
# Add time dimension to data, based on input file path. | |
ds = ds.expand_dims("time").assign_coords( | |
time=("time", [datetime_from_prism_flname(fl_path)]) | |
) | |
return ds | |
ppt = xr.open_mfdataset( | |
INPUT_GLOB, | |
preprocess=preprocess, | |
concat_dim="time", | |
parallel=True | |
) | |
print(ppt) | |
ppt.to_netcdf(OUT_PATH) | |
resources: | |
requests: | |
memory: 0.5Gi | |
cpu: "500m" | |
limits: | |
memory: 0.5Gi | |
cpu: "1000m" | |
activeDeadlineSeconds: 1800 | |
# Fan-in combining all annual NetCDFs to single zarr store. | |
- name: combine-years | |
inputs: | |
artifacts: | |
- name: annuals | |
path: /tmp/annuals | |
outputs: | |
artifacts: | |
- name: prism-zarr | |
path: /tmp/annual.zarr | |
archive: | |
none: {} | |
script: | |
image: us-central1-docker.pkg.dev/downscalecmip6/private/dodola:0.16.1 | |
command: [python] | |
source: | | |
from pathlib import Path | |
import dask | |
import xarray as xr | |
INPUT_GLOB = "/tmp/annuals/*.nc" | |
OUT_PATH="/tmp/annual.zarr" | |
ppt = xr.open_mfdataset( | |
INPUT_GLOB, | |
concat_dim="time", | |
parallel=True | |
) | |
ppt.load() | |
print(ppt) | |
dask.compute([ppt.to_zarr(OUT_PATH, compute=False)]) | |
resources: | |
requests: | |
memory: 8Gi | |
cpu: "1000m" | |
limits: | |
memory: 8Gi | |
cpu: "2000m" | |
activeDeadlineSeconds: 1800 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment