Last active
October 1, 2015 00:08
-
-
Save jonathan-taylor/7e6013c243558a7ff890 to your computer and use it in GitHub Desktop.
selective inference after cross validation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script to implement selective inference after cross-validation | |
""" | |
import numpy as np | |
from scipy.stats import norm as ndist | |
from statsmodels.distributions import ECDF | |
import matplotlib.pyplot as plt | |
from selection.algorithms.lasso import instance as data_instance | |
from selection.algorithms import sqrt_lasso | |
from selection.constraints.affine import (constraints, | |
sample_from_constraints) | |
from selection.distributions.discrete_family import discrete_family | |
def solve_grid(Y, | |
X, | |
L, | |
mults, | |
post_estimator=False, | |
solve_args={'min_its':10, 'max_its':20}): | |
""" | |
Solve the square-root LASSO over a grid of values. | |
.. math:: | |
\text{minimize}_{\beta} \|y-X\beta\|_2 + m * L \|\beta\|_1 | |
for $m$ in `mults`. | |
Parameters | |
---------- | |
Y : np.float(n) | |
Response vectors | |
X : np.float((n,p)) | |
Design matrix. | |
L : float | |
Value of $\lambda$ in square-root LASSO optimization | |
problem. | |
mults: [float] | |
Sequence of floats over which to solve square-root LASSO. | |
post_estimator: bool | |
Should we return the square-root LASSO estimate or the | |
OLS of the selected model (the post square-root LASSO estimator). | |
solve_args : {} | |
Keyword arguments passed to `solve_sqrt_lasso`. | |
Returns | |
------- | |
results : [(m, beta_m)] | |
Coefficient estimates for each `m` in `mults`. | |
""" | |
n, p = X.shape | |
results = [] | |
for i, m in enumerate(mults): | |
if i == 0: | |
results.append( | |
(m, sqrt_lasso.solve_sqrt_lasso(X, | |
Y, | |
m * L * np.ones(p), | |
**solve_args))) | |
else: | |
results.append( | |
(m, sqrt_lasso.solve_sqrt_lasso(X, | |
Y, | |
m * L * np.ones(p), | |
initial=results[-1][1], | |
**solve_args))) | |
if post_estimator: | |
active = np.nonzero(results[-1][1])[0] | |
coef = np.zeros(p) | |
if active.shape[0] > 0: | |
X_E = X[:,active] | |
coef[active] = np.dot(np.linalg.pinv(X_E), Y) | |
results[-1] = (m, coef) | |
return results | |
def split_and_validate(Y, | |
X, | |
L, | |
mults, | |
test_frac, | |
shift_size=0): | |
""" | |
Choose which lambda minimizes prediction | |
over a random split. | |
Parameters | |
---------- | |
Y : np.float(n) | |
Response vectors | |
X : np.float((n,p)) | |
Design matrix. | |
L : float | |
Value of $\lambda$ in square-root LASSO optimization | |
problem. | |
mults: [float] | |
Sequence of floats over which to solve square-root LASSO. | |
test_frac: float | |
What percentage should be used as test? | |
shift_size : int | |
Return minimizer plus a uniform | |
positive or negative shift in the index | |
of `mults` of a given size. | |
Affects the size of the window of | |
minimizers to be accepted by later sampling scheme. | |
""" | |
n, p = X.shape | |
training = np.zeros(n, np.bool) | |
training[np.random.choice(np.arange(n), size=int(test_frac*n), replace=False)] = 1 | |
test = ~training | |
results = solve_grid(Y[training], X[training], L, mults=mults) | |
error = [] | |
for m, coef in results: | |
error.append((np.linalg.norm(Y[test] - np.dot(X[test], coef))**2, m)) | |
m_min = min(error)[1] | |
idx_min = list(mults).index(m_min) | |
# this shift randomizes the returned value of \lambda | |
# have not really used it much. | |
if shift_size > 0: | |
random_shift = np.random.random_integers(low=-shift_size, | |
high=shift_size) | |
idx_min += random_shift | |
idx_min = max(idx_min, 0) | |
return [mults[idx_min + j] for j in range(-shift_size, shift_size+1, 1) | |
if idx_min + j >= 0 and idx_min + j < len(mults)] | |
def kfold_CV(Y, | |
X, | |
L, | |
mults, | |
K=10, | |
random_shift=0, | |
shuffle=True, random_state=False): | |
""" | |
Choose which lambda minimizes prediction | |
using K-fold cross-validation. | |
Parameters | |
---------- | |
Y : np.float(n) | |
Response vectors | |
X : np.float((n,p)) | |
Design matrix. | |
L : float | |
Value of $\lambda$ in square-root LASSO optimization | |
problem. | |
mults: [float] | |
Sequence of floats over which to solve square-root LASSO. | |
K : int | |
Number of folds (defaults to 10). | |
shift_size : int | |
Return minimizer plus a uniform | |
positive or negative shift in the index | |
of `mults` of a given size. | |
Affects the size of the window of | |
minimizers to be accepted by later sampling scheme. | |
shuffle : bool | |
Argument to `sklearn.cross_validation.KFold` | |
random_state : None, int or RandomState | |
Argument to `sklearn.cross_validation.KFold` | |
Returns | |
------- | |
window : [float] | |
Values of multiplier that will be accepted | |
in sampling routine. | |
""" | |
n, p = X.shape | |
kfold = cross_validation.KFold(n=n, | |
n_folds=K, | |
shuffle=shuffle, | |
random_state=random_state) | |
error = {} | |
for train_index, test_index in kfold: | |
results = solve_grid(Y[train_index], X[train_index], L, mults=mults) | |
for m, coef in results: | |
error.setdefault(m, []).append( | |
nplinalg.norm(Y[test_index] - np.dot(X[test_index], coef))**2) | |
for m in mults: | |
error[m] = (np.mean(error[m]), np.std(error[m])) | |
m_min = min([(error[k], k) for k in error])[1] | |
idx_min = list(mults).index(m_min) | |
if shift_size > 0: | |
random_shift = np.random.random_integers(low=-shift_size, | |
high=shift_size) | |
idx_min += random_shift | |
idx_min = max(idx_min, 0) | |
return [mults[idx_min + j] for j in range(-shift_size, shift_size+1, 1) | |
if idx_min + j >= 0 and idx_min + j < len(mults)] | |
def select_vars_signs(Y, | |
X, | |
L, | |
solve_args={'min_its':150}): | |
""" | |
Return active set and signs for solution | |
of square-root LASSO. | |
Parameters | |
---------- | |
Y : np.float(n) | |
Response vectors | |
X : np.float((n,p)) | |
Design matrix. | |
L : float | |
Value of $\lambda$ in square-root LASSO optimization | |
problem. | |
solve_args : {} | |
Keyword arguments passed to `solve_sqrt_lasso`. | |
Returns | |
------- | |
active : [int] | |
Active set. | |
signs : [-1,1] | |
Signs of variables in active set. | |
sqlasso : `selection.algorithms.sqrt_lasso.sqrt_lasso` | |
Instance whose signs and active sets we return. | |
""" | |
n, p = X.shape | |
SL = sqrt_lasso.sqrt_lasso(Y, X, L * np.ones(p)) | |
SL.fit(**solve_args) | |
return SL.active, SL.z_E, SL | |
class sqrt_lasso_tuned(object): | |
""" | |
Selective inference after choosing lambda | |
in sqrt LASSO. | |
Uses selected model on randomized data | |
after having chosen \lambda. | |
When \sigma^2_E is unknown | |
we estimate \sigma^2_E. | |
""" | |
CV_period = 50 # how often to try to update Y_CV | |
def __init__(self, | |
Y, | |
X, | |
mults = np.linspace(1.5,0.5,11), | |
target_R2 = 0.5, | |
test_frac = 0.9, | |
sigma = None, | |
sd_inter = np.sqrt(0.2), | |
sd_select = np.sqrt(0.1), | |
sd_valid = np.sqrt(0.1), | |
shift_size=1 | |
): | |
""" | |
Parameters | |
---------- | |
Y : np.float(n) | |
Response vectors | |
X : np.float((n,p)) | |
Design matrix. | |
mults: [float] | |
Sequence of floats over which to solve square-root LASSO. | |
target_R2 : float | |
Rough guess at population $R^2$. Used | |
to find a rough estimate of noise variance | |
if not known. | |
sigma : float | |
Noise variance, if known. | |
sd_inter : float | |
Proportion of variance (using | |
`self.rough_sigma` as baseline) | |
added in randomization | |
to Y_inter. | |
sd_select : float | |
Proportion of variance (using | |
`self.rough_sigma` as baseline) | |
added in randomization | |
to Y_select. | |
sd_valid : float | |
Proportion of variance (using | |
`self.rough_sigma` as baseline) | |
added in randomization | |
to Y_valid. | |
shift_size : int | |
Return minimizer plus a uniform | |
positive or negative shift in the index | |
of `mults` of a given size. | |
Affects the size of the window of | |
minimizers to be accepted by later sampling scheme. | |
""" | |
n, p = X.shape | |
(self.Y, | |
self.X, | |
self.test_frac, | |
self.mults) = ( | |
Y, | |
X, | |
test_frac, | |
mults) | |
self.L = sqrt_lasso.choose_lambda(X) | |
self.sd_inter = sd_inter | |
self.sd_select = sd_select | |
self.sd_valid = sd_valid | |
# get a rough estimate of sigma | |
# based on a rough population | |
# guess of R^2 | |
if sigma is None: | |
self.rough_sigma = np.sqrt((1 - target_R2) * np.linalg.norm(Y)**2 / n) | |
else: | |
self.rough_sigma = sigma | |
# randomize our response | |
self.randomize() | |
# now find which CV values to accept | |
self.accept_values = self.choose_lambda(self.Y_valid, | |
shift_size=shift_size) | |
# TODO: there is a boundary issue above | |
# if we actually randomize lambda | |
# no issue when shift_size=0 | |
self.selected_value = np.median(self.accept_values) | |
self.choose_variables() | |
self.null_sample = {} | |
# estimate sigma if needed | |
if sigma is not None: | |
self.sigma_resid = sigma | |
else: | |
resid_current = (Y - np.dot(self.X[:,self.active_set], | |
np.dot(self.SQ._XEinv, Y))) | |
n = Y.shape[0] | |
self.sigma_resid = np.linalg.norm(resid_current) / np.sqrt(n - self.active_set.shape[0]) | |
# find response independent of Y_inter, Y_valid, Y_select | |
ratio = self.sigma_resid**2 / (self.sd_inter * self.rough_sigma)**2 | |
self.Y_indep = Y - ratio * (self.Y_inter - Y) | |
self.betahat_indep = np.dot(np.linalg.pinv(self.X[:,self.active_set]), self.Y_indep) | |
cov_indep = np.linalg.pinv(np.dot(self.X[:,self.active_set].T, self.X[:,self.active_set])) * self.sigma_resid**2 * (1 + ratio) | |
T_indep = np.fabs(self.betahat_indep / np.sqrt(np.diag(cov_indep))) | |
self.pval_indep = 2 * (1 - ndist.cdf(T_indep)) | |
def randomize(self): | |
""" | |
Carry out the randomization, | |
finding the value of lambda | |
as well as the selected variables and signs. | |
Initiailizes the attributes: [Y_inter, Y_valid, Y_select]. | |
""" | |
n = self.Y.shape[0] | |
self.Y_sample = self.Y.copy() | |
# intermediate between | |
# CV and model selection | |
# and the actual data | |
self.Y_inter = self.Y_sample + (self.rough_sigma * | |
np.random.standard_normal(n) * | |
self.sd_inter) | |
# used for choosing CV | |
self.Y_valid = self.Y_inter + (self.rough_sigma * | |
np.random.standard_normal(n) * | |
self.sd_valid) | |
# used for choosing variables and signs | |
self.Y_select = self.Y_inter + (self.rough_sigma * | |
np.random.standard_normal(n) * | |
self.sd_select) | |
def choose_lambda(self, Y, shift_size=0): | |
""" | |
Select a value of lambda using `self.Y_valid` | |
Stores result in attribute `accept_values`. | |
Any resampling of Y_valid that results in a value within these | |
values has a chance to be accepted. | |
Parameters | |
---------- | |
Y : np.float(n) | |
Response vector. | |
shift_size : int | |
Return minimizer plus a uniform | |
positive or negative shift in the index | |
of `mults` of a given size. | |
Affects the size of the window of | |
minimizers to be accepted by later sampling scheme. | |
""" | |
return split_and_validate(Y, | |
self.X, | |
self.L, | |
self.mults, | |
self.test_frac, | |
shift_size=shift_size) | |
def choose_variables(self): | |
""" | |
Select variables and signs `self.Y_select` | |
Stores results in attributes `(active_set, active_signs)`. | |
Also initializes some attributes used in sampling Y_select. | |
""" | |
# now, select a model | |
(self.active_set, | |
self.active_signs, | |
self.SQ) = select_vars_signs(self.Y_select, | |
self.X, | |
self.selected_value * self.L) | |
offset = self.SQ.active_constraints.offset | |
linear_part = - np.identity(offset.shape[0]) | |
self.X_E = self.X[:,self.active_set] | |
self.OLS_matrix = self.SQ._XEinv | |
self.coef_select = (np.dot(self.OLS_matrix, self.Y_select) * | |
self.SQ.z_E) | |
self.constraints = constraints(linear_part, offset) | |
self.constraints.mean[:] = (np.dot(self.OLS_matrix, self.Y_inter) * | |
self.SQ.z_E) | |
self.constraints.covariance[:] = (np.dot(self.OLS_matrix, | |
self.OLS_matrix.T) * | |
self.rough_sigma**2 * self.sd_select**2) | |
def step_valid(self, | |
max_trials=10): | |
""" | |
Try and move Y_valid | |
by accept reject stopping after `max_trials`. | |
""" | |
X, L, mults = self.X, self.L, self.mults | |
n, p = X.shape | |
count = 0 | |
while True: | |
count += 1 | |
Y_proposal = self.Y_inter + (np.random.standard_normal(n) | |
* self.sd_valid * self.rough_sigma) | |
if len(self.mults) > 0: | |
proposal_value = self.choose_lambda(Y_proposal, | |
shift_size=0) | |
if proposal_value[0] in self.accept_values: | |
self.Y_valid[:] = Y_proposal | |
break | |
else: | |
self.Y_valid[:] = Y_proposal | |
break | |
if count >= max_trials: | |
break | |
def step_select(self, | |
ndraw=500, | |
fix_residual=True): | |
""" | |
Take `ndraw` Gibbs steps of Y_select | |
""" | |
self.constraints.mean[:] = (np.dot(self.OLS_matrix, self.Y_inter) * | |
self.SQ.z_E) | |
Y_current = self.Y_select.copy() | |
sample = sample_from_constraints(self.constraints, | |
self.coef_select, | |
self.coef_select, | |
ndraw=ndraw, | |
burnin=0) | |
self.coef_select[:] = sample[-1] | |
Y_hat = np.dot(self.X_E, self.active_signs * self.coef_select) | |
self.Y_select += Y_hat - Y_current | |
def step_inter(self, | |
do_gibbs=True): | |
quadratic_term = (1. / self.sd_inter**2 + | |
1. / self.sd_valid**2 + | |
1. / self.sd_select**2) | |
sampling_sd = self.rough_sigma * 1. / np.sqrt(quadratic_term) | |
sampling_mean = ((self.Y_sample / self.sd_inter**2 + | |
self.Y_valid / self.sd_valid**2 + | |
self.Y_select / self.sd_select**2) / | |
quadratic_term) | |
n = self.Y_sample.shape[0] | |
self.Y_inter[:] = (sampling_mean + np.random.standard_normal(n) * | |
sampling_sd) | |
def step_randomized(self): | |
""" | |
Take a move on the all | |
randomized variables. | |
""" | |
self.counter += 1 | |
if self.counter % self.CV_period == 0: | |
self.step_valid() | |
self.step_select() | |
self.step_inter() | |
def setup_inference(self, which_var): | |
""" | |
Setup sampling to sample from | |
null distribution for a given variable. | |
TODO: we should use the tilted distribution | |
with the selectively unbiased estimate. Will help | |
with intervals. | |
""" | |
self.which_var = which_var | |
which_idx = list(self.active_set).index(which_var) | |
keep = np.ones(self.active_set.shape[0], np.bool) | |
keep[which_idx] = False | |
self._X_Ej = self.X_E[:,keep] | |
self._X_j = self.X[:,which_var] | |
self._X_Eji = np.linalg.pinv(self._X_Ej) | |
self.null_sample.setdefault(which_var, []) | |
# maybe we should reinitialize | |
# self.Y_sample[:] = self.Y | |
self._mu_j = np.dot(self._X_Ej, | |
np.dot(self._X_Eji, self.Y)) | |
def step_sample(self): | |
""" | |
Move Y_sample -- a Gaussian draw | |
with mean depending on Y_inter. | |
""" | |
n, p = self.X.shape | |
self.null_sample[self.which_var].append((self._X_j * self.Y_sample).sum()) | |
sigma_resid = self.sigma_resid | |
quadratic_term = 1. / sigma_resid**2 + 1. / (self.rough_sigma * self.sd_inter)**2 | |
sampling_sd = 1. / np.sqrt(quadratic_term) | |
sampling_mean = (self.Y_inter * 1. / | |
(self.rough_sigma * self.sd_inter)**2) / quadratic_term | |
uncond_draw = sampling_mean + (np.random.standard_normal(n) * | |
sampling_sd) | |
proj_draw = uncond_draw - np.dot(self._X_Ej, | |
np.dot(self._X_Eji, uncond_draw)) | |
self.Y_sample[:] = self._mu_j + proj_draw | |
def __iter__(self): | |
if not hasattr(self, "which_var"): | |
raise ValueError("choose a variable in active set on which to do inference") | |
self.counter = 0 | |
return self | |
def next(self): | |
# move randomized responses Y_inter, Y_valid, Y_select | |
self.step_randomized() | |
# move Y_sample | |
self.step_sample() | |
def pvalue(self, which_var, | |
ndraw=2000, | |
burnin=500): | |
""" | |
Produce two p-values for one of the | |
active variables, which_var, assumed to be in self.active_set | |
First one uses sampling, the second based on | |
a particular conditional distribution. | |
""" | |
self.setup_inference(which_var); iter(self) | |
for _ in xrange(ndraw + burnin): | |
self.next() | |
family = discrete_family(self.null_sample[which_var][burnin:], | |
np.ones(ndraw)) | |
obs = (self._X_j * self.Y).sum() | |
pval = family.cdf(0, obs) | |
pval = 2 * min(pval, 1 - pval) | |
idx = list(self.active_set).index(which_var) | |
return pval, self.pval_indep[idx] | |
class sqrt_lasso_tuned_conditional(sqrt_lasso_tuned): | |
""" | |
Condition on the value of Y_valid -- accomplished by never | |
sampling Y_valid. | |
TODO: this can be made a fast sampler by automatically | |
marginalizing over Y_inter. | |
""" | |
CV_period = np.inf | |
pass | |
def instance(ndraw=500, sigma_known=True, | |
burnin=100, | |
s=7, | |
rho=0.3, | |
method=sqrt_lasso_tuned, | |
snr=5): | |
# generate a null and alternative pvalue | |
# from a particular model | |
X, Y, beta, active, sigma = data_instance(s=s, rho=rho, snr=snr, n=500) | |
if sigma_known: | |
sigma = sigma | |
else: | |
sigma = None | |
method_ = method(Y, X, target_R2=0.8, sigma=sigma) | |
if (set(range(7)).issubset(method_.active_set) and | |
method_.active_set.shape[0] > 7): | |
do_null = True | |
if do_null: | |
which_var = method_.active_set[s] # the first null one | |
method_.setup_inference(which_var) ; iter(method_) | |
for i in range(ndraw + burnin): | |
method_.next() | |
Z = np.array(method_.null_sample[which_var][burnin:]) | |
family = discrete_family(Z, | |
np.ones_like(Z)) | |
obs = (method_._X_j * Y).sum() | |
pval0 = family.cdf(0, obs) | |
pval0 = 2 * min(pval0, 1 - pval0) | |
else: | |
pval0 = np.random.sample() | |
which_var = 0 | |
method_.setup_inference(which_var); iter(method_) | |
for i in range(ndraw + burnin): | |
method_.next() | |
family = discrete_family(method_.null_sample[which_var][burnin:], | |
np.ones(ndraw)) | |
obs = (method_._X_j * Y).sum() | |
pvalA = family.cdf(0, obs) | |
pvalA = 2 * min(pvalA, 1 - pvalA) | |
return pval0, pvalA, method_ | |
def plot_fig(): | |
f = plt.figure(num=1) | |
s = 7 | |
P0, PA = [], [] | |
screened = 0 | |
results = {} | |
counter = {} | |
linestyle = {sqrt_lasso_tuned:'-', | |
sqrt_lasso_tuned_conditional:'-.'} | |
results.setdefault('indep', []) | |
for i in range(200): | |
print i | |
for method in [sqrt_lasso_tuned, sqrt_lasso_tuned_conditional]: | |
result = instance(ndraw=1000, burnin=500, sigma_known=False, | |
method=method, s=s) | |
counter.setdefault(method, 0) | |
if result is not None: | |
results.setdefault(method, []).append(result[:2]) | |
counter[method] += 1 | |
P0 = np.array(results[method])[:,0] | |
PA = np.array(results[method])[:,1] | |
U = np.linspace(0,1,101) | |
ecdf0 = ECDF(P0)(U) | |
ecdfA = ECDF(PA)(U) | |
ax = f.gca() | |
ax.plot(U, ecdf0, 'k' + linestyle[method], | |
linewidth=3, | |
label=str(method.__name__)[11:]) | |
ax.plot(U, ecdfA, 'r' + linestyle[method], | |
linewidth=3) | |
results['indep'].append((result[2].pval_indep[s], result[2].pval_indep[0])) | |
np.savez(str(method.__name__)[11:] + '.npz', P0=P0, PA=PA) | |
print ('screening', str(method.__name__)), (counter[method] * 1.) / (i + 1) | |
print ('power', str(method.__name__)), np.mean(PA < 0.05) | |
print ('level', str(method.__name__)), np.mean(P0 < 0.05) | |
P0 = np.array(results['indep'])[:,0] | |
PA = np.array(results['indep'])[:,1] | |
np.savez('indep.npz', P0=P0, PA=PA) | |
print ('power', 'indep'), np.mean(PA < 0.05) | |
print ('level', 'level'), np.mean(P0 < 0.05) | |
U = np.linspace(0,1,101) | |
ecdf0 = ECDF(P0)(U) | |
ecdfA = ECDF(PA)(U) | |
ax.plot(U, ecdf0, 'k:', | |
linewidth=3, | |
label='independent') | |
ax.plot(U, ecdfA, 'r:', | |
linewidth=3) | |
ax.legend(loc='lower right') | |
f.savefig('ecdf.pdf') | |
f.clf() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment