Skip to content

Instantly share code, notes, and snippets.

@brews
Created February 8, 2022 21:25
Show Gist options
  • Save brews/1a0f7decfc884eb3dbe70c2dbcff2c90 to your computer and use it in GitHub Desktop.
Save brews/1a0f7decfc884eb3dbe70c2dbcff2c90 to your computer and use it in GitHub Desktop.
Rough Argo Workflow to load daily PRISM variables, clip to CA bounding box, and output to single Zarr store.
import xarray as xr
tmin_url = "~/Downloads/tmin-20220208/annual.zarr"
tmax_url = "~/Downloads/tmax-20220208/annual.zarr"
tmean_url = "~/Downloads/tmean-20220208/annual.zarr"
out_url = "gs://fakebucketname/prism-ca-20220208.zarr"
tmin = xr.open_zarr(tmin_url).drop("crs")
tmax = xr.open_zarr(tmax_url).drop("crs")
tmean = xr.open_zarr(tmean_url).drop("crs")
prism_ca = xr.merge([
xr.open_zarr(tmin_url).rename({"Band1": "tmin"}).drop("crs"),
xr.open_zarr(tmax_url).rename({"Band1": "tmax"}).drop("crs"),
xr.open_zarr(tmean_url).rename({"Band1": "tmean"}).drop("crs")
])
# Include CRS projection information in Dataset root attrs.
prism_ca.attrs = prism_ca.attrs | xr.open_zarr(tmin_url)['crs'].attrs
# import matplotlib.pyplot as plt
# prism_ca['tmin'].isel(time=slice(0, 3)).plot(col="time");plt.show()
prism_ca.to_zarr(out_url)
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: prism2zarr-
annotations:
workflows.argoproj.io/description: >-
Load daily PRISM variables, clip to CA bounding box, and output to
single Zarr store.
workflows.argoproj.io/tags: zarr,prism,util,bil,climate
workflows.argoproj.io/version: '>= 3.1.0'
spec:
entrypoint: main
arguments:
parameters:
- name: variable
value: "tmean"
- name: firstyear
value: 1999
- name: lastyear
value: 2015
templates:
- name: main
parallelism: 1
inputs:
parameters:
- name: variable
- name: firstyear
- name: lastyear
dag:
tasks:
- name: get-prism-years
template: process-prism-year
arguments:
parameters:
- name: variable
value: "{{ inputs.parameters.variable }}"
- name: year
value: "{{ item }}"
withSequence:
start: "{{ inputs.parameters.firstyear }}"
end: "{{ inputs.parameters.lastyear }}"
- name: combine-years
template: combine-years
depends: get-prism-years
arguments:
artifacts:
- name: annuals
gcs:
key: "{{ workflow.uid }}/annual_netcdfs"
- name: process-prism-year
inputs:
parameters:
- name: variable
- name: year
dag:
tasks:
- name: download
template: download-prism
arguments:
parameters:
- name: variable
value: "{{ inputs.parameters.variable }}"
- name: year
value: "{{ inputs.parameters.year }}"
- name: convert
template: convert
depends: download
arguments:
artifacts:
- name: rawzips
from: "{{ tasks.download.outputs.artifacts.raws }}"
- name: aggregate-to-annual
template: aggregate-to-annual
depends: convert
arguments:
parameters:
- name: year
value: "{{ inputs.parameters.year }}"
artifacts:
- name: daily-netcdfs
from: "{{ tasks.convert.outputs.artifacts.netcdfs }}"
- name: download-prism
inputs:
parameters:
- name: variable
- name: year
outputs:
artifacts:
- name: raws
path: /tmp/prism
archive:
none: {}
container:
image: curlimages/curl:7.81.0
command: [sh, -c]
args: ["curl ftp://ftp.prism.oregonstate.edu/daily/{{ inputs.parameters.variable }}/{{ inputs.parameters.year }}/PRISM_{{ inputs.parameters.variable }}_stable_4kmD2_{{ inputs.parameters.year }}[01-12][01-31]_bil.zip -O --create-dirs --output-dir /tmp/prism"]
resources:
requests:
memory: 0.1Gi
cpu: "100m"
limits:
memory: 0.1Gi
cpu: "500m"
activeDeadlineSeconds: 1800
# Unzip and convert each daily BIL file to daily NetCDFs cropped to CA bounding box.
- name: convert
inputs:
artifacts:
- name: rawzips
path: /tmp/raws
outputs:
artifacts:
- name: netcdfs
path: /tmp/cropped
script:
# image: osgeo/gdal:alpine-normal-3.4.1
image: osgeo/gdal:ubuntu-full-3.4.1
command: [bash]
source: |
unzip '/tmp/raws/*_bil.zip' -d /tmp/raws
mkdir -p /tmp/converted
mkdir -p /tmp/cropped
for filename in /tmp/raws/*_bil.bil
do
bn="$(basename -- $filename)"
gdal_translate -if "EHdr" -of "netCDF" "${filename}" "/tmp/converted/${bn%%.*}.nc"
gdalwarp -te -124.5 32.4 -114.2 42.1 "/tmp/converted/${bn%%.*}.nc" "/tmp/cropped/${bn%%.*}.nc"
done
resources:
requests:
memory: 0.5Gi
cpu: "500m"
limits:
memory: 0.5Gi
cpu: "1000m"
activeDeadlineSeconds: 1800
retryStrategy:
limit: 2
# Combine all daily NetCDFs to one annual, with time dim from input
# NetCDF path names.
- name: aggregate-to-annual
inputs:
parameters:
- name: year
artifacts:
- name: daily-netcdfs
path: /tmp/daily_ncs
outputs:
artifacts:
- name: annual-nc
path: /tmp/annual.nc
archive:
none: {}
gcs:
key: "{{ workflow.uid }}/annual_netcdfs/{{ inputs.parameters.year }}.nc"
script:
image: us-central1-docker.pkg.dev/downscalecmip6/private/dodola:0.16.1
command: [python]
source: |
from datetime import datetime
from pathlib import Path
import xarray as xr
INPUT_GLOB = "/tmp/daily_ncs/*.nc"
OUT_PATH="/tmp/annual.nc"
def datetime_from_prism_flname(p):
p = Path(p)
return datetime.strptime(p.stem.split("_")[-2], "%Y%m%d")
def preprocess(ds):
"""Preprocess input ds before concatenate"""
fl_path = ds.encoding["source"]
# Add time dimension to data, based on input file path.
ds = ds.expand_dims("time").assign_coords(
time=("time", [datetime_from_prism_flname(fl_path)])
)
return ds
ppt = xr.open_mfdataset(
INPUT_GLOB,
preprocess=preprocess,
concat_dim="time",
parallel=True
)
print(ppt)
ppt.to_netcdf(OUT_PATH)
resources:
requests:
memory: 0.5Gi
cpu: "500m"
limits:
memory: 0.5Gi
cpu: "1000m"
activeDeadlineSeconds: 1800
# Fan-in combining all annual NetCDFs to single zarr store.
- name: combine-years
inputs:
artifacts:
- name: annuals
path: /tmp/annuals
outputs:
artifacts:
- name: prism-zarr
path: /tmp/annual.zarr
archive:
none: {}
script:
image: us-central1-docker.pkg.dev/downscalecmip6/private/dodola:0.16.1
command: [python]
source: |
from pathlib import Path
import dask
import xarray as xr
INPUT_GLOB = "/tmp/annuals/*.nc"
OUT_PATH="/tmp/annual.zarr"
ppt = xr.open_mfdataset(
INPUT_GLOB,
concat_dim="time",
parallel=True
)
ppt.load()
print(ppt)
dask.compute([ppt.to_zarr(OUT_PATH, compute=False)])
resources:
requests:
memory: 8Gi
cpu: "1000m"
limits:
memory: 8Gi
cpu: "2000m"
activeDeadlineSeconds: 1800
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment