Skip to content

Instantly share code, notes, and snippets.

@ougx
Created February 26, 2025 02:13
Show Gist options
  • Save ougx/f9adaa6dcf0bd702c0cc51d25475e4a7 to your computer and use it in GitHub Desktop.
Save ougx/f9adaa6dcf0bd702c0cc51d25475e4a7 to your computer and use it in GitHub Desktop.
Compute the raw empirical variogram and store results in a Pandas DataFrame
import sys
import numpy as np
import pandas as pd
def raw_variogram(coords, vals, cutoff=np.inf, times=None, t_cutoff=np.inf,
calc_angle=False, maxobs=None, seed=None):
"""
Compute the raw empirical variogram and store results in a Pandas DataFrame.
Args:
coords (array-like): Spatial coordinates (n x d) [2D or 3D].
vals (array-like): Corresponding values (n,).
cutoff (float, optional): Maximum spatial lag distance. Defaults to np.inf.
times (array-like, optional): Time values for each point. Defaults to None.
t_cutoff (float, optional): Maximum time lag distance. Defaults to np.inf.
return_index (bool, optional): Whether to return index pairs. Defaults to False.
calc_angle (bool, optional): Whether to compute angles between pairs. Defaults to False.
maxobs (int, optional): Maximum number of observations to use. Defaults to None.
seed (int, optional): Random seed for reproducibility. Defaults to None.
Returns:
pd.DataFrame: DataFrame with distance, semivariance, and optionally angles/time/indexes.
"""
coords, vals = np.asarray(coords), np.asarray(vals).reshape(-1)
n, dim = coords.shape
assert len(vals) == n, 'Lengths of coords and vals must match'
if dim not in [1, 2, 3]:
raise ValueError("Coordinates must be 1D, 2D or 3D.")
# Select a subset of observations if maxobs is set
if maxobs is not None and maxobs < n:
if seed is not None:
np.random.seed(seed)
selected_indices = np.sort(np.random.choice(n, maxobs, replace=False))
coords = coords[selected_indices]
vals = vals[selected_indices]
if times is not None:
times = np.array(times)[selected_indices]
n = maxobs # Update n to match new subset size
else:
selected_indices = np.arange(n)
hasTime = times is not None
i1s = []
i2s = []
dhs = []
dh_hori = []
dh_total = []
dts = []
angle_hs = []
angle_vs = []
for i1 in range(n - 1):
dh = coords[i1+1:, :] - coords[i1:i1+1, :]
hlag = np.linalg.norm(dh, axis=1)
mask = hlag<=cutoff
if hasTime:
tlag = np.abs(times[i1] - times[i1+1:])
mask = mask & (tlag<=t_cutoff)
dts.append(tlag)
i1s.append(np.full(mask.sum(), i1))
i2s.append(np.arange(i1+1, n)[mask])
dh_total.append(hlag[mask])
dhs.append(dh[mask])
if calc_angle and dim>=2:
angle_h_radians = np.arctan2(dh[mask,1], dh[mask,0])
angle_hs.append(np.degrees(angle_h_radians) % 360) # Ensure 0-360 range
horizontal_dist = np.linalg.norm(dh[mask,:2], axis=1)
dh_hori.append(horizontal_dist)
if dim==3:
angle_v_radians = np.arctan2(dh[mask,2], horizontal_dist)
angle_vs.append(np.degrees(angle_v_radians))
if i1 % max(n // 10, 1) == 0:
sys.stdout.write(f'\rProgress: {int((i1 / (n - 1)) * 100)}%')
sys.stdout.flush()
print('\n')
results = {
"index0": selected_indices[np.concatenate(i1s)],
"index2": selected_indices[np.concatenate(i2s)],
"slag" : np.concatenate(dh_total),
}
if dim>1:
dhs = np.concatenate(dhs, axis=0)
for k in range(dim):
results["d"+str(k)] = dhs[:,k]
if dts:
results["tlag"] = np.concatenate(dts)
if angle_hs:
results["angle_h"] = np.concatenate(angle_hs)
if angle_vs:
results["angle_v"] = np.concatenate(angle_vs)
return pd.DataFrame(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment