Skip to content

Instantly share code, notes, and snippets.

@ntfargo
Created August 12, 2025 12:51
Show Gist options
  • Save ntfargo/44a3a146d5acc12bc7adfe63ef50a86e to your computer and use it in GitHub Desktop.
Save ntfargo/44a3a146d5acc12bc7adfe63ef50a86e to your computer and use it in GitHub Desktop.
Causal supervised deep dynamic pca.
import numpy as np
def intervene(data, causal_graph, node_idx):
# Dummy causal intervention: robust to 1D segments where parent indices don't align
if causal_graph is None:
return data
parents = np.where(causal_graph[:, node_idx])[0]
# If data is a 1D time segment for a single node, parents can't index into it.
if data.ndim == 1:
if parents.size == 0 or data.size == 0:
return data
# Simple, safe placeholder: shift by the segment mean if parents exist
shift = float(np.mean(data))
return data + shift
# If data has node axis, keep original idea but guard index range
in_range = parents[(parents >= 0) & (parents < data.shape[0])]
if in_range.size > 0:
return data + np.mean(data[in_range], axis=0)
return data
def train_temporal_dnn(target, data):
# Placeholder: linear regression as DNN proxy
from sklearn.linear_model import LinearRegression
model = LinearRegression().fit(data.reshape(-1, 1), target)
return model.coef_[0] # Return 'theta'
def temporal_dnn(data, theta):
return theta * np.mean(data) # Reconstructed predictor
def top_eigenvectors(sigma, k):
eigvals, eigvecs = np.linalg.eigh(sigma) # Use eigh for symmetric cov
idx = np.argsort(eigvals)[::-1][:k]
return eigvecs[:, idx]
def apply_causal_penalty(B, causal_graph, reg_lambda):
# Regularize B by penalizing non-causal loadings (L1 on invalid edges)
if causal_graph is None:
return B
# Assume causal_graph (N x N); mask non-edges
mask = (causal_graph == 0).astype(float) # Penalize where no edge
penalty = reg_lambda * mask[:, :B.shape[1]] * np.abs(B) # Shape-adjusted
return B - np.sign(B) * penalty # Proximal step for L1
def forecast_dnn(g_lagged, y_past):
# Shapes-agnostic: combine summary statistics to avoid dot shape mismatches
g_mean = float(np.mean(g_lagged)) if np.size(g_lagged) else 0.0
if len(y_past) > 0:
y_mean = float(np.mean(y_past))
return 0.5 * (g_mean + y_mean)
return g_mean
def c_sddp(X, y, q0, h, causal_graph, reg_lambda=0.1, K_star=2):
"""
Causal-SDDP: Supervised dynamic dimension reduction with causal regularization.
Args:
- X: np.array (N x T) predictors
- y: np.array (T,) target
- q0: int, lag window
- h: int, forecast horizon
- causal_graph: np.array (N x N) adjacency matrix or None
- reg_lambda: float, regularization strength
- K_star: int, number of factors
Returns:
- y_hat: float, forecasted value
"""
N, T = X.shape
t = T - 1 # Focus on last timestep for simplicity
x_star = np.zeros((N, T))
# Stage 1: Causal target-aware predictors
for i in range(N):
lagged_idx = slice(max(0, t - q0 + 1), t + 1)
lagged_data = X[i, lagged_idx]
intervened_data = intervene(lagged_data, causal_graph, i)
target = y[min(t + h, T - 1)] # Handle index overflow
theta_i = train_temporal_dnn(np.repeat(target, len(intervened_data)), intervened_data)
x_star[i, t] = temporal_dnn(intervened_data, theta_i)
# Stage 2: PCA on x_star with causal regularization
Sigma = np.cov(x_star)
B_raw = top_eigenvectors(Sigma, K_star)
B_star = apply_causal_penalty(B_raw, causal_graph, reg_lambda)
# Extract factors (adjust for vector at t)
g_star = (1 / N) * B_star.T @ x_star[:, t]
# Forecasting (simplified lagged g_star)
g_lagged = g_star # Extend to lags in production
y_past = y[max(0, t - q0 + 1):t + 1]
y_hat = forecast_dnn(g_lagged, y_past)
return y_hat
N, T = 10, 50
X = np.random.randn(N, T)
y = np.sin(np.arange(T)) + 0.1 * np.random.randn(T)
causal_graph = np.random.randint(0, 2, (N, N)) # Random DAG
y_hat = c_sddp(X, y, q0=5, h=1, causal_graph=causal_graph)
print(f"Forecasted y_{T+1}: {y_hat}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment