Created
August 12, 2025 12:51
-
-
Save ntfargo/44a3a146d5acc12bc7adfe63ef50a86e to your computer and use it in GitHub Desktop.
Causal supervised deep dynamic pca.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
def intervene(data, causal_graph, node_idx): | |
# Dummy causal intervention: robust to 1D segments where parent indices don't align | |
if causal_graph is None: | |
return data | |
parents = np.where(causal_graph[:, node_idx])[0] | |
# If data is a 1D time segment for a single node, parents can't index into it. | |
if data.ndim == 1: | |
if parents.size == 0 or data.size == 0: | |
return data | |
# Simple, safe placeholder: shift by the segment mean if parents exist | |
shift = float(np.mean(data)) | |
return data + shift | |
# If data has node axis, keep original idea but guard index range | |
in_range = parents[(parents >= 0) & (parents < data.shape[0])] | |
if in_range.size > 0: | |
return data + np.mean(data[in_range], axis=0) | |
return data | |
def train_temporal_dnn(target, data): | |
# Placeholder: linear regression as DNN proxy | |
from sklearn.linear_model import LinearRegression | |
model = LinearRegression().fit(data.reshape(-1, 1), target) | |
return model.coef_[0] # Return 'theta' | |
def temporal_dnn(data, theta): | |
return theta * np.mean(data) # Reconstructed predictor | |
def top_eigenvectors(sigma, k): | |
eigvals, eigvecs = np.linalg.eigh(sigma) # Use eigh for symmetric cov | |
idx = np.argsort(eigvals)[::-1][:k] | |
return eigvecs[:, idx] | |
def apply_causal_penalty(B, causal_graph, reg_lambda): | |
# Regularize B by penalizing non-causal loadings (L1 on invalid edges) | |
if causal_graph is None: | |
return B | |
# Assume causal_graph (N x N); mask non-edges | |
mask = (causal_graph == 0).astype(float) # Penalize where no edge | |
penalty = reg_lambda * mask[:, :B.shape[1]] * np.abs(B) # Shape-adjusted | |
return B - np.sign(B) * penalty # Proximal step for L1 | |
def forecast_dnn(g_lagged, y_past): | |
# Shapes-agnostic: combine summary statistics to avoid dot shape mismatches | |
g_mean = float(np.mean(g_lagged)) if np.size(g_lagged) else 0.0 | |
if len(y_past) > 0: | |
y_mean = float(np.mean(y_past)) | |
return 0.5 * (g_mean + y_mean) | |
return g_mean | |
def c_sddp(X, y, q0, h, causal_graph, reg_lambda=0.1, K_star=2): | |
""" | |
Causal-SDDP: Supervised dynamic dimension reduction with causal regularization. | |
Args: | |
- X: np.array (N x T) predictors | |
- y: np.array (T,) target | |
- q0: int, lag window | |
- h: int, forecast horizon | |
- causal_graph: np.array (N x N) adjacency matrix or None | |
- reg_lambda: float, regularization strength | |
- K_star: int, number of factors | |
Returns: | |
- y_hat: float, forecasted value | |
""" | |
N, T = X.shape | |
t = T - 1 # Focus on last timestep for simplicity | |
x_star = np.zeros((N, T)) | |
# Stage 1: Causal target-aware predictors | |
for i in range(N): | |
lagged_idx = slice(max(0, t - q0 + 1), t + 1) | |
lagged_data = X[i, lagged_idx] | |
intervened_data = intervene(lagged_data, causal_graph, i) | |
target = y[min(t + h, T - 1)] # Handle index overflow | |
theta_i = train_temporal_dnn(np.repeat(target, len(intervened_data)), intervened_data) | |
x_star[i, t] = temporal_dnn(intervened_data, theta_i) | |
# Stage 2: PCA on x_star with causal regularization | |
Sigma = np.cov(x_star) | |
B_raw = top_eigenvectors(Sigma, K_star) | |
B_star = apply_causal_penalty(B_raw, causal_graph, reg_lambda) | |
# Extract factors (adjust for vector at t) | |
g_star = (1 / N) * B_star.T @ x_star[:, t] | |
# Forecasting (simplified lagged g_star) | |
g_lagged = g_star # Extend to lags in production | |
y_past = y[max(0, t - q0 + 1):t + 1] | |
y_hat = forecast_dnn(g_lagged, y_past) | |
return y_hat | |
N, T = 10, 50 | |
X = np.random.randn(N, T) | |
y = np.sin(np.arange(T)) + 0.1 * np.random.randn(T) | |
causal_graph = np.random.randint(0, 2, (N, N)) # Random DAG | |
y_hat = c_sddp(X, y, q0=5, h=1, causal_graph=causal_graph) | |
print(f"Forecasted y_{T+1}: {y_hat}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment