Created
July 30, 2021 08:06
-
-
Save kingjr/bed8d15f76857c8582d0813e72f85ade to your computer and use it in GitHub Desktop.
banded_ridge.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from numpy.linalg import multi_dot | |
from scipy.linalg import svd | |
from sklearn.linear_model import RidgeCV | |
from sklearn.model_selection import KFold | |
def get_from_indices(X, indices): | |
out = np.zeros_like(X[0]) | |
for idx in np.unique(indices): | |
sel = np.where(indices == idx) | |
out[sel] = X[idx][sel] | |
return out | |
class BandedRidgeCV(RidgeCV): | |
def __init__( | |
self, | |
alphas, | |
ratios, | |
groups, | |
cv=4, | |
fit_intercept=True, | |
alpha_per_target=True, | |
): | |
self.alphas = np.asarray(alphas) | |
self.ratios = np.asarray(ratios) | |
self.groups = np.asarray(groups) | |
if not np.array_equal(np.unique(groups), [0, 1]): | |
raise NotImplementedError("groups must be composed of 0s and 1s") | |
if fit_intercept: | |
raise NotImplementedError # TODO, use StandardScaler pipeline itmt | |
self.alpha_per_target = alpha_per_target | |
self.cv = cv | |
def _prep_ratio(self, ratio): | |
"""Prepare banded SVD""" | |
angle = np.arctan(ratio) | |
lambda1 = np.cos(angle) / np.cos(np.pi / 4) | |
lambda2 = np.sin(angle) / np.cos(np.pi / 4) | |
bands = np.r_[ | |
np.ones(sum(self.groups == 0)) * lambda1, | |
np.ones(sum(self.groups == 1)) * lambda2, | |
] | |
Cinv = np.diag(bands ** -1) | |
return Cinv, bands | |
def _find_best_alpha(self, X, Y, train, test, ratio): | |
Cinv, bands = self._prep_ratio(ratio) | |
U, S, VT = svd(X / bands[None, :], full_matrices=False) | |
V = VT.T | |
UTY = U[train].T @ Y[train] | |
# Loop across alphas | |
errors = np.ones((len(self.alphas), Y.shape[1])) * np.inf | |
Ws = np.zeros((len(self.alphas), Y.shape[1], X.shape[1])) | |
for idx, alpha in enumerate(self.alphas): | |
# I didn't manage to apply the LOO trick to banded ridge | |
# | |
# D = 1. / (S2 + alpha) - alpha ** -1 | |
# K = U @ np.diag(D) @ UTY + alpha ** -1 * Y | |
# G_diag = (D * U ** 2).sum(axis=-1) + alpha ** -1 | |
# error = K / G_diag[:, None] | |
# W = K.T @ X | |
# Solve | |
W = multi_dot([Cinv, V, np.diag(S / (S ** 2 + alpha)), UTY]) | |
err = ((X[test] @ W - Y[test]) ** 2).mean(0) | |
errors[idx] = err | |
Ws[idx] = W.T | |
best = np.argmin(errors, axis=0) | |
if self.alpha_per_target: | |
W = get_from_indices(Ws, best) | |
errors = get_from_indices(errors, best) | |
alphas = self.alphas[best] | |
else: | |
counts = np.bincount(best) | |
best = np.argmax(counts) | |
W = Ws[best] | |
errors = errors[best] | |
alphas = self.alphas[best] | |
return W, errors, alphas | |
def _find_best_ratio(self, X, Y, train, test): | |
W = np.zeros((len(self.ratios), Y.shape[1], X.shape[1])) | |
alphas = np.zeros((len(self.ratios), Y.shape[1])) | |
errors = np.ones((len(self.ratios), Y.shape[1])) * np.inf | |
for idx, ratio in enumerate(self.ratios): | |
# Compute error with alpha hyperparam optim | |
ratio_W, ratio_err, ratio_alpha = self._find_best_alpha( | |
X, Y, train, test, ratio | |
) | |
alphas[idx] = ratio_alpha | |
errors[idx] = ratio_err | |
W[idx] = ratio_W | |
best = np.argmin(errors, axis=0) | |
if self.alpha_per_target: | |
W = get_from_indices(W, best) | |
errors = get_from_indices(errors, best) | |
alphas = get_from_indices(alphas, best) | |
ratios = self.ratios[best] | |
else: | |
counts = np.bincount(best) | |
best = np.argmax(counts) | |
W = W[best] | |
errors = errors[best] | |
alphas = alphas[best] | |
ratios = self.ratios[best] | |
return W, errors, alphas, ratios | |
def fit(self, X, Y): | |
if Y.ndim == 1: | |
Y = Y[:, None] | |
cv = KFold(self.cv) if isinstance(self.cv, int) else self.cv | |
W = np.zeros((cv.n_splits, Y.shape[1], X.shape[1])) | |
errors = np.zeros((cv.n_splits, Y.shape[1])) | |
alphas = np.zeros((cv.n_splits, Y.shape[1])) | |
ratios = np.zeros((cv.n_splits, Y.shape[1])) | |
for split, (train, test) in enumerate(cv.split(X, Y)): | |
( | |
W[split], | |
errors[split], | |
alphas[split], | |
ratios[split], | |
) = self._find_best_ratio(X, Y, train, test) | |
self.errors_ = np.mean(errors, axis=0) | |
self.alphas_ = 10 ** np.mean(np.log10(alphas), axis=0) | |
self.ratios_ = 10 ** np.mean(np.log10(ratios), axis=0) | |
self.coef_ = W.mean(0) # TODO retrain? | |
self.intercept_ = 0.0 | |
return self | |
def predict(self, X): | |
return X @ self.coef_.T | |
if __name__ == "__main__": | |
from sklearn.pipeline import make_pipeline | |
from sklearn.preprocessing import StandardScaler | |
def make_data(): | |
np.random.seed(0) | |
dy = 3 # e.g. 3 voxels | |
dx1 = 5 # e.g. 5 dims of wordembedding | |
dx2 = 5 # 5 dims of gpt2 emb | |
dx = dx1 + dx2 | |
d = dx + dy | |
cov = np.random.randn(d, d) | |
cov = cov.T @ cov | |
X = np.random.multivariate_normal(mean=np.zeros(d), cov=cov, size=500) | |
X, y = X[:, :dx], X[:, dx:] | |
groups = [0] * dx1 + [1] * dx2 | |
assert len(groups) == X.shape[1] | |
return X, y, groups | |
X, y, groups = make_data() | |
# ridge regression | |
alphas = np.logspace(-2, 5, 8) | |
ridge = RidgeCV(alphas, fit_intercept=False) | |
# banded ridge regression | |
# regularization ratio between group1 and group2 | |
ratios = 10 ** np.linspace(-1.5, 1.5, 5) | |
banded_ridge = BandedRidgeCV(alphas, ratios, groups, fit_intercept=False) | |
# compute score | |
models = dict(ridge=ridge, banded_ridge=banded_ridge) | |
for name, model in models.items(): | |
pipe = make_pipeline(StandardScaler(), model) | |
pipe.fit(X, y) | |
print(name, pipe.score(X, y)) |
Author
kingjr
commented
Jul 30, 2021
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment