Source code for selectinf.randomized.modelQ

import functools

import numpy as np
import regreg.api as rr
from ..constraints.affine import constraints

from .query import gaussian_query
from .randomization import randomization

[docs]class modelQ(gaussian_query): r""" A class for the randomized LASSO for post-selection inference. The problem solved is .. math:: \text{minimize}_{\beta} -X^Ty + \frac{1}{2} \beta^TQ\beta + \sum_{i=1}^p \lambda_i |\beta_i\| - \omega^T\beta + \frac{\epsilon}{2} \|\beta\|^2_2 where $\lambda$ is `lam`, $\omega$ is a randomization generated below and the last term is a small ridge penalty. Each static method forms $\ell$ as well as the $\ell_1$ penalty. The generic class forms the remaining two terms in the objective. """
[docs] def __init__(self, Q, X, y, feature_weights, ridge_term=None, randomizer_scale=None, perturb=None): r""" Create a new post-selection object for the LASSO problem Parameters ---------- loglike : `regreg.smooth.glm.glm` A (negative) log-likelihood as implemented in `regreg`. feature_weights : np.ndarray Feature weights for L-1 penalty. If a float, it is brodcast to all features. ridge_term : float How big a ridge term to add? randomizer_scale : float Scale for IID components of randomization. perturb : np.ndarray Random perturbation subtracted as a linear term in the objective function. """ (self.Q, self.X, self.y) = (Q, X, y) self.loss = rr.quadratic_loss(Q.shape[0], Q=Q) n, p = X.shape self.nfeature = p if np.asarray(feature_weights).shape == (): feature_weights = np.ones(loglike.shape) * feature_weights self.feature_weights = np.asarray(feature_weights) mean_diag = np.diag(Q).mean() if ridge_term is None: ridge_term = np.std(y) * np.sqrt(mean_diag) / np.sqrt(n - 1) if randomizer_scale is None: randomizer_scale = np.sqrt(mean_diag) * 0.5 * np.std(y) * np.sqrt(n / (n - 1.)) self.randomizer = randomization.isotropic_gaussian((p,), randomizer_scale) self.ridge_term = ridge_term self.penalty = rr.weighted_l1norm(self.feature_weights, lagrange=1.) self._initial_omega = perturb # random perturbation
[docs] def fit(self, solve_args={'tol':1.e-12, 'min_its':50}, perturb=None): """ Fit the randomized lasso using `regreg`. Parameters ---------- solve_args : keyword args Passed to `regreg.problems.simple_problem.solve`. Returns ------- signs : np.float Support and non-zero signs of randomized lasso solution. """ p = self.nfeature # take a new perturbation if supplied if perturb is not None: self._initial_omega = perturb if self._initial_omega is None: self._initial_omega = self.randomizer.sample() quad = rr.identity_quadratic(self.ridge_term, 0, -self._initial_omega, 0) quad_data = rr.identity_quadratic(0, 0, -self.X.T.dot(self.y), 0) problem = rr.simple_problem(self.loss, self.penalty) self.initial_soln = problem.solve(quad + quad_data, **solve_args) active_signs = np.sign(self.initial_soln) active = self._active = active_signs != 0 self._lagrange = self.penalty.weights unpenalized = self._lagrange == 0 active *= ~unpenalized self._overall = overall = (active + unpenalized) > 0 self._inactive = inactive = ~self._overall self._unpenalized = unpenalized _active_signs = active_signs.copy() _active_signs[unpenalized] = np.nan # don't release sign of unpenalized variables self.selection_variable = {'sign':_active_signs, 'variables':self._overall} # initial state for opt variables initial_subgrad = -(self.loss.smooth_objective(self.initial_soln, 'grad') + quad_data.objective(self.initial_soln, 'grad') + quad.objective(self.initial_soln, 'grad')) self.initial_subgrad = initial_subgrad initial_scalings = np.fabs(self.initial_soln[active]) initial_unpenalized = self.initial_soln[self._unpenalized] self.observed_opt_state = np.concatenate([initial_scalings, initial_unpenalized]) E = overall Q_E = self.Q[E][:,E] _beta_unpenalized = np.linalg.inv(Q_E).dot(self.X[:,E].T.dot(self.y)) beta_bar = np.zeros(p) beta_bar[overall] = _beta_unpenalized self._beta_full = beta_bar # observed state for score in internal coordinates self.observed_internal_state = np.hstack([_beta_unpenalized, -self.loss.smooth_objective(beta_bar, 'grad')[inactive] + quad_data.objective(beta_bar, 'grad')[inactive]]) # form linear part self.num_opt_var = self.observed_opt_state.shape[0] # (\bar{\beta}_{E \cup U}, N_{-E}, c_E, \beta_U, z_{-E}) # E for active # U for unpenalized # -E for inactive _opt_linear_term = np.zeros((p, self.num_opt_var)) _score_linear_term = np.zeros((p, self.num_opt_var)) # \bar{\beta}_{E \cup U} piece -- the unpenalized M estimator X, y = self.X, self.y _hessian_active = self.Q[:, active] _hessian_unpen = self.Q[:, unpenalized] _score_linear_term = -np.hstack([_hessian_active, _hessian_unpen]) # set the observed score (data dependent) state self.observed_score_state = _score_linear_term.dot(_beta_unpenalized) self.observed_score_state[inactive] += (self.loss.smooth_objective(beta_bar, 'grad')[inactive] + quad_data.objective(beta_bar, 'grad')[inactive]) def signed_basis_vector(p, j, s): v = np.zeros(p) v[j] = s return v active_directions = np.array([signed_basis_vector(p, j, active_signs[j]) for j in np.nonzero(active)[0]]).T scaling_slice = slice(0, active.sum()) if np.sum(active) == 0: _opt_hessian = 0 else: _opt_hessian = _hessian_active * active_signs[None, active] + self.ridge_term * active_directions _opt_linear_term[:, scaling_slice] = _opt_hessian # beta_U piece unpenalized_slice = slice(active.sum(), self.num_opt_var) unpenalized_directions = np.array([signed_basis_vector(p, j, 1) for j in np.nonzero(unpenalized)[0]]).T if unpenalized.sum(): _opt_linear_term[:, unpenalized_slice] = (_hessian_unpen + self.ridge_term * unpenalized_directions) # two transforms that encode score and optimization # variable roles self.opt_transform = (_opt_linear_term, self.initial_subgrad) self.score_transform = (_score_linear_term, np.zeros(_score_linear_term.shape[0])) # now store everything needed for the projections # the projection acts only on the optimization # variables self._setup = True self.scaling_slice = scaling_slice self.unpenalized_slice = unpenalized_slice self.ndim = self.loss.shape[0] # compute implied mean and covariance opt_linear, opt_offset = self.opt_transform A_scaling = -np.identity(self.num_opt_var) b_scaling = np.zeros(self.num_opt_var) self._setup_sampler(A_scaling, b_scaling, opt_linear, opt_offset) return active_signs
[docs] def summary(self, target="selected", features=None, parameter=None, level=0.9, ndraw=10000, burnin=2000, compute_intervals=False, dispersion=None): """ Produce p-values and confidence intervals for targets of model including selected features Parameters ---------- target : one of ['selected', 'full'] features : np.bool Binary encoding of which features to use in final model and targets. parameter : np.array Hypothesized value for parameter -- defaults to 0. level : float Confidence level. ndraw : int (optional) Defaults to 1000. burnin : int (optional) Defaults to 1000. compute_intervals : bool Compute confidence intervals? dispersion : float (optional) Use a known value for dispersion, or Pearson's X^2? """ if parameter is None: parameter = np.zeros(self.loss.shape[0]) if target == 'selected': (observed_target, cov_target, cov_target_score, alternatives) = self.selected_targets(features=features, dispersion=dispersion) else: X, y = self.loglike.data n, p = X.shape if n > p and target == 'full': (observed_target, cov_target, cov_target_score, alternatives) = self.full_targets(features=features, dispersion=dispersion) else: raise NotImplementedError (observed_target, cov_target, cov_target_score, alternatives) = self.debiased_targets(features=features, dispersion=dispersion) if self._overall.sum() > 0: opt_sample = self.sampler.sample(ndraw, burnin) pivots = self.sampler.coefficient_pvalues(observed_target, cov_target, cov_target_score, parameter=parameter, sample=opt_sample, alternatives=alternatives) if not np.all(parameter == 0): pvalues = self.sampler.coefficient_pvalues(observed_target, cov_target, cov_target_score, parameter=np.zeros_like(parameter), sample=opt_sample, alternatives=alternatives) else: pvalues = pivots intervals = None if compute_intervals: intervals = self.sampler.confidence_intervals(observed_target, cov_target, cov_target_score, sample=opt_sample) return pivots, pvalues, intervals else: return [], [], []
[docs] def selective_MLE(self, target="selected", features=None, parameter=None, level=0.9, compute_intervals=False, dispersion=None, solve_args={'tol':1.e-12}): """ Parameters ---------- target : one of ['selected', 'full'] features : np.bool Binary encoding of which features to use in final model and targets. parameter : np.array Hypothesized value for parameter -- defaults to 0. level : float Confidence level. ndraw : int (optional) Defaults to 1000. burnin : int (optional) Defaults to 1000. compute_intervals : bool Compute confidence intervals? dispersion : float (optional) Use a known value for dispersion, or Pearson's X^2? """ if parameter is None: parameter = np.zeros(self.loss.shape[0]) observed_target, cov_target, cov_target_score, alternatives = self.selected_targets(features=features, dispersion=dispersion) # working out conditional law of opt variables given # target after decomposing score wrt target return self.sampler.selective_MLE(observed_target, cov_target, cov_target_score, self.observed_opt_state, solve_args=solve_args)
[docs] def selected_targets(self, features=None, dispersion=None): X, y = self.X, self.y n, p = X.shape if features is None: active = self._active unpenalized = self._unpenalized noverall = active.sum() + unpenalized.sum() overall = active + unpenalized Xfeat = X[:,overall] score_linear = self.score_transform[0] Q = -score_linear[overall] Qi = np.linalg.inv(Q) cov_target = Qi.dot(Xfeat.T.dot(Xfeat)).dot(Qi) # sandwich estimator observed_target = self._beta_full[overall] crosscov_target_score = score_linear.dot(cov_target) print(cov_target[:5][:,:5]) alternatives = [{1:'greater', -1:'less'}[int(s)] for s in self.selection_variable['sign'][active]] + ['twosided'] * unpenalized.sum() else: features_b = np.zeros_like(self._overall) features_b[features] = True features = features_b Xfeat = X[:,features] Qfeat = self.Q[features][:,features] Gfeat = self.loss.smooth_objective(self.initial_soln, 'grad')[features] - Xfeat.T.dot(y) Qfeat_inv = np.linalg.inv(Qfeat) one_step = self.initial_soln[features] - Qfeat_inv.dot(Gfeat) cov_target = Qfeat_inv.dot(Xfeat.T.dot(Xfeat)).dot(Qfeat_inv) _score_linear = -self.Q[features] crosscov_target_score = _score_linear.dot(cov_target) observed_target = one_step alternatives = ['twosided'] * features.sum() if dispersion is None: # use Pearson's X^2 relaxed = np.linalg.pinv(Xfeat).dot(y) dispersion = ((y - Xfeat.dot(relaxed))**2).sum() / (n - Xfeat.shape[1]) print(dispersion, 'dispersion') return observed_target, cov_target * dispersion, crosscov_target_score.T * dispersion, alternatives
[docs] def full_targets(self, features=None, dispersion=None): if features is None: features = self._overall features_bool = np.zeros(self._overall.shape, np.bool) features_bool[features] = True features = features_bool X, y = self.loglike.data n, p = X.shape # target is one-step estimator Qfull = self.Q G = self.loss.smooth_objective(self.initial_soln, 'grad') - X.T.dot(y) Qfull_inv = np.linalg.inv(Qfull) one_step = self.initial_soln - Qfull_inv.dot(G) cov_target = Qfull_inv[features][:,features] observed_target = one_step[features] crosscov_target_score = np.zeros((p, cov_target.shape[0])) crosscov_target_score[features] = -np.identity(cov_target.shape[0]) if dispersion is None: # use Pearson's X^2 dispersion = ((y - self.loglike.saturated_loss.mean_function(X.dot(one_step)))**2 / self._W).sum() / (n - p) alternatives = ['twosided'] * features.sum() return observed_target, cov_target * dispersion, crosscov_target_score.T * dispersion, alternatives