Source code for selectinf.randomized.slope

from __future__ import print_function

import functools
import numpy as np

# sklearn imports

have_isotonic = False
try:
    from sklearn.isotonic import IsotonicRegression
    have_isotonic = True
except ImportError:
    raise ValueError('unable to import isotonic regression from sklearn, SLOPE subgradient projection will not work')

# regreg imports

from regreg.atoms.slope import _basic_proximal_map
import regreg.api as rr

from ..constraints.affine import constraints

from .randomization import randomization
from ..base import restricted_estimator
from .query import gaussian_query
from .lasso import lasso

[docs]class slope(gaussian_query):
[docs] def __init__(self, loglike, slope_weights, ridge_term, randomizer, perturb=None): r""" Create a new post-selection object for the SLOPE problem Parameters ---------- loglike : `regreg.smooth.glm.glm` A (negative) log-likelihood as implemented in `regreg`. slope_weights : np.ndarray SLOPE weights for L-1 penalty. If a float, it is broadcast to all features. ridge_term : float How big a ridge term to add? randomizer : object Randomizer -- contains representation of randomization density. perturb : np.ndarray Random perturbation subtracted as a linear term in the objective function. """ self.loglike = loglike self.nfeature = p = self.loglike.shape[0] if np.asarray(slope_weights).shape == (): slope_weights = np.ones(loglike.shape) * slope_weights self.slope_weights = np.asarray(slope_weights) self.randomizer = randomizer self.ridge_term = ridge_term self.penalty = rr.slope(slope_weights, lagrange=1.) self._initial_omega = perturb # random perturbation
def _solve_randomized_problem(self, perturb=None, solve_args={'tol': 1.e-12, 'min_its': 50}): p = self.nfeature # take a new perturbation if supplied if perturb is not None: self._initial_omega = perturb if self._initial_omega is None: self._initial_omega = self.randomizer.sample() quad = rr.identity_quadratic(self.ridge_term, 0, -self._initial_omega, 0) problem = rr.simple_problem(self.loglike, self.penalty) initial_soln = problem.solve(quad, **solve_args) initial_subgrad = -(self.loglike.smooth_objective(initial_soln, 'grad') + quad.objective(initial_soln, 'grad')) return initial_soln, initial_subgrad
[docs] def fit(self, solve_args={'tol': 1.e-12, 'min_its': 50}, perturb=None): self.initial_soln, self.initial_subgrad = self._solve_randomized_problem(perturb=perturb, solve_args=solve_args) p = self.initial_soln.shape[0] # now we have to work out SLOPE details, clusters, etc. active_signs = np.sign(self.initial_soln) active = self._active = active_signs != 0 self._overall = overall = active> 0 self._inactive = inactive = ~self._overall _active_signs = active_signs.copy() self.selection_variable = {'sign': _active_signs, 'variables': self._overall} indices = np.argsort(-np.fabs(self.initial_soln)) sorted_soln = self.initial_soln[indices] initial_scalings = np.sort(np.unique(np.fabs(self.initial_soln[active])))[::-1] self.observed_opt_state = initial_scalings self._unpenalized = np.zeros(p, np.bool) _beta_unpenalized = restricted_estimator(self.loglike, self._overall, solve_args=solve_args) beta_bar = np.zeros(p) beta_bar[overall] = _beta_unpenalized self._beta_full = beta_bar self.num_opt_var = self.observed_opt_state.shape[0] X, y = self.loglike.data W = self._W = self.loglike.saturated_loss.hessian(X.dot(beta_bar)) _hessian_active = np.dot(X.T, X[:, active] * W[:, None]) _score_linear_term = -_hessian_active self.score_transform = (_score_linear_term, np.zeros(_score_linear_term.shape[0])) self.observed_score_state = _score_linear_term.dot(_beta_unpenalized) self.observed_score_state[inactive] += self.loglike.smooth_objective(beta_bar, 'grad')[inactive] cur_indx_array = [] cur_indx_array.append(0) cur_indx = 0 pointer = 0 signs_cluster = [] for j in range(p - 1): if np.abs(sorted_soln[j + 1]) != np.abs(sorted_soln[cur_indx]): cur_indx_array.append(j + 1) cur_indx = j + 1 sign_vec = np.zeros(p) sign_vec[np.arange(j + 1 - cur_indx_array[pointer]) + cur_indx_array[pointer]] = \ np.sign(self.initial_soln[indices[np.arange(j + 1 - cur_indx_array[pointer]) + cur_indx_array[pointer]]]) signs_cluster.append(sign_vec) pointer = pointer + 1 if sorted_soln[j + 1] == 0: break signs_cluster = np.asarray(signs_cluster).T if signs_cluster.size == 0: return active_signs else: X_clustered = X[:, indices].dot(signs_cluster) _opt_linear_term = X.T.dot(X_clustered) _, prec = self.randomizer.cov_prec opt_linear, opt_offset = (_opt_linear_term, self.initial_subgrad) # now make the constraints self._setup = True A_scaling_0 = -np.identity(self.num_opt_var) A_scaling_1 = -np.identity(self.num_opt_var)[:(self.num_opt_var - 1), :] for k in range(A_scaling_1.shape[0]): A_scaling_1[k, k + 1] = 1 A_scaling = np.vstack([A_scaling_0, A_scaling_1]) b_scaling = np.zeros(2 * self.num_opt_var - 1) self._setup_sampler(A_scaling, b_scaling, opt_linear, opt_offset) return active_signs
# Targets of inference # and covariance with score representation # are same as LASSO
[docs] @staticmethod def gaussian(X, Y, slope_weights, sigma=1., quadratic=None, ridge_term=0., randomizer_scale=None): loglike = rr.glm.gaussian(X, Y, coef=1. / sigma ** 2, quadratic=quadratic) n, p = X.shape mean_diag = np.mean((X ** 2).sum(0)) if ridge_term is None: ridge_term = np.std(Y) * np.sqrt(mean_diag) / np.sqrt(n - 1) if randomizer_scale is None: randomizer_scale = np.sqrt(mean_diag) * 0.5 * np.std(Y) * np.sqrt(n / (n - 1.)) randomizer = randomization.isotropic_gaussian((p,), randomizer_scale) return slope(loglike, np.asarray(slope_weights) / sigma ** 2, ridge_term, randomizer)
# Projection onto selected subgradients of SLOPE def _projection_onto_selected_subgradients(prox_arg, weights, ordering, cluster_sizes, active_signs, last_value_zero=True): """ Compute the projection of a point onto the set of subgradients of the SLOPE penalty with a given clustering of the solution and signs of the variables. This is a projection onto a lower dimensional set. The dimension of this set is p -- the dimensions of the `prox_arg` minus the number of unique values in `ordered_clustering` + 1 if the last value of the solution was zero (i.e. solution was sparse). Parameters ---------- prox_arg : np.ndarray(p, np.float) Point to project weights : np.ndarray(p, np.float) Weights of the SLOPE penalty. ordering : np.ndarray(p, np.int) Order of original argument to SLOPE prox. First entry corresponds to largest argument of SLOPE prox. cluster_sizes : sequence Sizes of clusters, starting with largest in absolute value. active_signs : np.ndarray(p, np.int) Signs of non-zero coefficients. last_value_zero : bool Is the last solution value equal to 0? """ result = np.zeros_like(prox_arg) ordered_clustering = [] cur_idx = 0 for cluster_size in cluster_sizes: ordered_clustering.append([ordering[j + cur_idx] for j in range(cluster_size)]) cur_idx += cluster_size # Now, run appropriate SLOPE prox on each cluster cur_idx = 0 for i, cluster in enumerate(ordered_clustering): prox_subarg = np.array([prox_arg[j] for j in cluster]) # If the value of the soln to the prox was non-zero # then we solve a SLOPE of size 1 smaller than the cluster # If the cluster size is 1, the value is just # the corresponding signed weight if i < len(ordered_clustering) - 1 or not last_value_zero: if len(cluster) == 1: result[cluster[0]] = weights[cur_idx] * active_signs[cluster[0]] else: indices = [j + cur_idx for j in range(len(cluster))] cluster_weights = weights[indices] ir = IsotonicRegression() _ir_result = ir.fit_transform(np.arange(len(cluster)), cluster_weights[::-1])[::-1] result[indices] = -np.multiply(active_signs[indices], _ir_result/2.) else: indices = np.array([j + cur_idx for j in range(len(cluster))]) cluster_weights = weights[indices] slope_prox = _basic_proximal_map(prox_subarg, cluster_weights) result[indices] = prox_subarg - slope_prox cur_idx += len(cluster) return result