import functools, copy
import numpy as np
import regreg.api as rr
from ..randomized.randomization import randomization
[docs]class CV(object):
[docs] def __init__(self, loss, folds, lam_seq, objective_randomization=None, epsilon=None):
(self.loss,
self.folds,
self.lam_seq,
self.objective_randomization,
self.epsilon) = (loss,
folds,
lam_seq,
objective_randomization,
epsilon)
if self.objective_randomization is not None:
if self.epsilon is None:
X, _ = loss.data
n = X.shape[0]
self.epsilon = np.true_divide(1, np.sqrt(n))
self.K = len(np.unique(self.folds))
self.ndim = len(lam_seq)
[docs] def CV_err(self,
penalty,
loss = None,
residual_randomization = None,
scale = None,
solve_args = {'min_its':20, 'tol':1.e-1}):
"""
Computes the non-randomized CV error and the one with added residual randomization
"""
if loss is None:
loss = copy.copy(self.loss)
X, y = loss.data
n, p = X.shape
CV_err = 0
CV_err_squared = 0
if residual_randomization is not None:
CV_err_randomized = 0
CV_err_squared_randomized = 0
if scale is None:
scale = 1.
for fold in np.unique(self.folds):
test = self.folds == fold
train = ~test
loss_train = loss.subsample(train)
loss_test = loss.subsample(test)
X_test, y_test = X[test], y[test]
n_test = y_test.shape[0]
if self.objective_randomization is not None:
randomized_train_loss = self.objective_randomization.randomize(loss_train, self.epsilon)[0] # randomized train loss
problem = rr.simple_problem(randomized_train_loss, penalty)
else:
problem = rr.simple_problem(loss_train, penalty)
beta_train = problem.solve(**solve_args)
_mu = lambda X, beta: loss_test.saturated_loss.mean_function(X.dot(beta))
resid = y_test - _mu(X_test, beta_train)
cur = (resid**2).sum() / n_test
CV_err += cur
CV_err_squared += (cur**2)
if residual_randomization is not None:
random_noise = scale * np.random.standard_normal(n_test)
cur_randomized = ((resid + random_noise)**2).sum() / n_test
CV_err_randomized += cur_randomized
CV_err_squared_randomized += cur_randomized**2
SD_CV = np.sqrt((CV_err_squared - ((CV_err**2)/self.K)) / float(self.K-1))
if residual_randomization is not None:
SD_CV_randomized = np.sqrt((CV_err_squared_randomized - (CV_err_randomized**2/self.K)) / (self.K-1))
return CV_err, SD_CV, CV_err_randomized, SD_CV_randomized
else:
return CV_err, SD_CV
[docs] def choose_lambda_CVr(self, scale = 1., loss=None):
"""
Minimizes CV error curve without randomization and the one with residual randomization
"""
if loss is None:
loss = self.loss
if not hasattr(self, 'scale'):
self.scale = scale
CV_curve = []
X, _ = loss.data
p = X.shape[1]
for lam in self.lam_seq:
penalty = rr.l1norm(p, lagrange=lam)
# CV_curve.append(self.CV_err(penalty, loss) + (lam,))
CV_curve.append(self.CV_err(penalty, loss, residual_randomization = True, scale = self.scale))
CV_curve = np.array(CV_curve)
CV_val = CV_curve[:,0]
CV_val_randomized = CV_curve[:,2]
lam_CV = self.lam_seq[np.argmin(CV_val)]
lam_CV_randomized = self.lam_seq[np.argmin(CV_val_randomized)]
SD_val = CV_curve[:,1]
SD_val_randomized = CV_curve[:,3]
return lam_CV, CV_val, SD_val, lam_CV_randomized, CV_val_randomized, SD_val_randomized
[docs] def bootstrap_CVr_curve(self):
"""
Bootstrap of CV error curve with residual randomization
"""
def _boot_CV_curve(indices):
X, y = self.loss.data
n, p = X.shape
folds_star = np.arange(n) % self.K
np.random.shuffle(folds_star)
loss_star = self.loss.subsample(indices)
# loss_star = rr.glm.gaussian(X[indices,:], y[indices])
result = self.choose_lambda_CVr(scale=self.scale, loss=loss_star)
CV_val = result[1]
return np.array(CV_val)
def _boot_CVr_curve(indices):
X, y = self.loss.data
n, p = X.shape
folds_star = np.arange(n) % self.K
np.random.shuffle(folds_star)
loss_star = self.loss.subsample(indices)
#loss_star = rr.glm.gaussian(X[indices,:], y[indices])
result = self.choose_lambda_CVr(scale=self.scale, loss=loss_star)
CV_val_randomized = result[4]
return np.array(CV_val_randomized)
return _boot_CV_curve, _boot_CVr_curve
[docs] def choose_lambda_CVR(self, scale1=None, scale2=None, loss=None):
"""
Minimizes CV error curve with additive randomization (CVR=CV+R1+R2=CV1+R2)
"""
if loss is None:
loss = copy.copy(self.loss)
CV_curve = []
X, _ = loss.data
p = X.shape[1]
for lam in self.lam_seq:
penalty = rr.l1norm(p, lagrange=lam)
#CV_curve.append(self.CV_err(penalty, loss) + (lam,))
CV_curve.append(self.CV_err(penalty, loss))
CV_curve = np.array(CV_curve)
rv1, rv2 = np.zeros(self.lam_seq.shape[0]), np.zeros(self.lam_seq.shape[0])
if scale1 is not None:
randomization1 = randomization.isotropic_gaussian((self.lam_seq.shape[0],), scale=scale1)
rv1 = np.asarray(randomization1._sampler(size=(1,)))
if scale2 is not None:
randomization2 = randomization.isotropic_gaussian((self.lam_seq.shape[0],), scale=scale2)
rv2 = np.asarray(randomization2._sampler(size=(1,)))
CVR_val = CV_curve[:,0]+rv1.flatten()+rv2.flatten()
lam_CVR = self.lam_seq[np.argmin(CVR_val)] # lam_CVR minimizes CVR
CV1_val = CV_curve[:,0]+rv1.flatten()
SD = CV_curve[:,1]
return lam_CVR, SD, CVR_val, CV1_val, self.lam_seq
[docs] def bootstrap_CVR_curve(self, scale1=None, scale2=None):
"""
Bootstrap of CVR=CV+R1+R2 and CV1=CV+R1 curves
"""
def _bootstrap_CVerr_curve(indices):
X, y = self.loss.data
n, p = X.shape
folds_star = np.arange(n) % self.K
np.random.shuffle(folds_star)
loss_star = self.loss.subsample(indices)
#loss_star = rr.glm.gaussian(X[indices,:], y[indices])
_, _, CVR_val, CV1_val, _ = self.choose_lambda_CVR(scale1, scale2, loss_star)
return np.array(CVR_val), np.array(CV1_val)
def _CVR_boot(indices):
return _bootstrap_CVerr_curve(indices)[0]
def _CV1_boot(indices):
return _bootstrap_CVerr_curve(indices)[1]
return _CVR_boot, _CV1_boot
[docs]def main():
from ...tests.instance import gaussian_instance
np.random.seed(1)
n, p = 3000, 1000
X, y, beta, nonzero, sigma = gaussian_instance(n=n, p=p, s=30, rho=0., sigma=1)
loss = rr.glm.gaussian(X,y)
lam_seq = np.exp(np.linspace(np.log(1.e-6), np.log(1), 30)) * np.fabs(np.dot(X.T,y)).max()
K = 5
folds = np.arange(n) % K
CV_compute = CV(loss, folds, lam_seq)
lam_CV, CV_val, SD_val, lam_CV_randomized, CV_val_randomized, SD_val_randomized = CV_compute.choose_lambda_CVr()
#print("CV error curve (nonrandomized):", CV_val)
minimum_CV = np.min(CV_val)
lam_idx = list(lam_seq).index(lam_CV)
SD_min = SD_val[lam_idx]
lam_1SD = lam_seq[max([i for i in range(lam_seq.shape[0]) if CV_val[i] <= minimum_CV + SD_min])]
#print(lam_CV, lam_1SD)
import matplotlib.pyplot as plt
plt.plot(np.log(lam_seq), CV_val)
plt.show()