Created
March 26, 2021 10:47
-
-
Save spencerkclark/c1555533f8c74bb9722a70486b372256 to your computer and use it in GitHub Desktop.
Code for computing regressions of variables with an arbitrary number of dimensions against a 1D index
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask.array as darray | |
import numpy as np | |
import xarray as xr | |
from collections import defaultdict | |
def _matmul(arr1, arr2): | |
if isinstance(arr1, darray.core.Array): | |
return darray.matmul(arr1, arr2) | |
else: | |
return np.matmul(arr1, arr2) | |
def regress(ds, index, sampling_dim): | |
"""Regress a variable against a given index along a sampling dimension. | |
Assumes the dimension of the index is the sampling dimension. | |
Parameters | |
---------- | |
ds : xr.Dataset | |
Input Dataset | |
index : xr.DataArray | |
Index DataArray; must be 1D | |
sampling_dim : xr.DataArray | |
Dimension name | |
Returns | |
------- | |
xr.Dataset or xr.DataArray | |
""" | |
if isinstance(ds, xr.DataArray): | |
ds = ds.to_dataset() | |
structure_dims = defaultdict(list) | |
for var in ds.data_vars: | |
sdims = set(ds[var].dims) - set([sampling_dim]) | |
sdims = tuple(sorted(tuple(sdims))) | |
structure_dims[sdims].append(var) | |
datasets = [] | |
for sdims, variables in structure_dims.items(): | |
structure = ds[variables] | |
structure = structure.stack(structure=tuple(sdims)) | |
structure = structure.transpose('structure', sampling_dim) | |
# Chunk to preserve block size | |
structure_chunk_sizes = [ds.chunks[dim][0] for dim in sdims] | |
structure_chunk_size = np.product(structure_chunk_sizes) | |
structure = structure.chunk({'structure': structure_chunk_size}) | |
result = xr.apply_ufunc( | |
_matmul, structure.fillna(0.0), index, | |
input_core_dims=[('structure', sampling_dim), (sampling_dim, )], | |
output_core_dims=[('structure', )], dask='allowed' | |
) | |
samples = xr.apply_ufunc( | |
np.isfinite, structure, dask='parallelized', | |
output_dtypes=[np.float]).sum(sampling_dim) | |
result = result / samples | |
datasets.append(result.unstack('structure')) | |
return xr.merge(datasets) | |
def regress_(da, index, sampling_dim): | |
"""Regress a variable against a given index along a sampling dimension. | |
Assumes the dimension of the index is the sampling dimension. | |
Parameters | |
---------- | |
da : xr.DataArray | |
Input DataArray | |
index : xr.DataArray | |
Index DataArray; must be 1D | |
sampling_dim : xr.DataArray | |
Dimension name | |
Returns | |
------- | |
xr.DataArray | |
""" | |
structure_dims = set(da.dims) - set([sampling_dim]) | |
structure = da.stack(structure=tuple(structure_dims)) | |
structure = structure.transpose('structure', sampling_dim) | |
result = xr.apply_ufunc( | |
_matmul, structure.fillna(0.0), index, | |
input_core_dims=[('structure', sampling_dim), (sampling_dim, )], | |
output_core_dims=[('structure', )], dask='allowed' | |
) | |
samples = xr.apply_ufunc( | |
np.isfinite, structure, dask='parallelized', | |
output_dtypes=[np.float]).sum(sampling_dim) | |
result = result / samples | |
return result.unstack('structure') | |
def lag_regress(da, index, sampling_dim, lag): | |
"""Lag regress a variable against a given index along a sampling dimension | |
Parameters | |
---------- | |
da : xr.DataArray | |
Input DataArray | |
index : xr.DataArray | |
Index DataArray; must be 1D | |
sampling_dim : xr.DataArray | |
Dimension name | |
lag : int | |
Integer lag | |
Returns | |
------- | |
xr.DataArray | |
""" | |
shifted = da.shift(**{sampling_dim: -lag}) | |
shifted, index = xr.align(shifted, index) | |
return regress(shifted, index, sampling_dim) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment