from __future__ import print_function
import functools
import numpy as np
from scipy.stats import norm as ndist
import regreg.api as rr
from .query import gaussian_query
from .randomization import randomization
[docs]class screening(gaussian_query):
[docs] def __init__(self,
observed_data,
covariance,
randomizer,
perturb=None):
self.observed_score_state = -observed_data # -Z if Z \sim N(\mu,\Sigma), X^Ty in regression setting
self.nfeature = p = self.observed_score_state.shape[0]
self.covariance = covariance
self.randomizer = randomizer
self._initial_omega = perturb
[docs] def fit(self, perturb=None):
gaussian_query.fit(self, perturb=perturb)
self._randomized_score = self.observed_score_state - self._initial_omega
return self._randomized_score, self._randomized_score.shape[0]
[docs] def multivariate_targets(self, features, dispersion=1.):
"""
Entries of the mean of \Sigma[E,E]^{-1}Z_E
"""
score_linear = self.covariance[:, features].copy() / dispersion
Q = score_linear[features]
cov_target = np.linalg.inv(Q)
observed_target = -np.linalg.inv(Q).dot(self.observed_score_state[features])
crosscov_target_score = -score_linear.dot(cov_target)
alternatives = ['twosided'] * features.sum()
return observed_target, cov_target * dispersion, crosscov_target_score.T * dispersion, alternatives
[docs] def full_targets(self, features, dispersion=1.):
"""
Entries of the mean of \Sigma[E,E]^{-1}Z_E
"""
score_linear = self.covariance[:, features].copy() / dispersion
Q = self.covariance / dispersion
cov_target = (np.linalg.inv(Q)[features])[:, features]
observed_target = -np.linalg.inv(Q).dot(self.observed_score_state)[features]
crosscov_target_score = -np.identity(Q.shape[0])[:, features]
alternatives = ['twosided'] * features.sum()
return observed_target, cov_target * dispersion, crosscov_target_score.T * dispersion, alternatives
[docs] def marginal_targets(self, features):
"""
Entries of the mean of Z_E
"""
score_linear = self.covariance[:, features]
Q = score_linear[features]
cov_target = Q
observed_target = -self.observed_score_state[features]
crosscov_target_score = -score_linear
alternatives = ['twosided'] * features.sum()
return observed_target, cov_target, crosscov_target_score.T, alternatives
[docs]class marginal_screening(screening):
[docs] def __init__(self,
observed_data,
covariance,
randomizer,
threshold,
perturb=None):
threshold = np.asarray(threshold)
if threshold.shape == ():
threshold = np.ones_like(observed_data) * threshold
self.threshold = threshold
screening.__init__(self,
observed_data,
covariance,
randomizer,
perturb=None)
[docs] def fit(self, perturb=None):
_randomized_score, p = screening.fit(self, perturb=perturb)
active = np.fabs(_randomized_score) >= self.threshold
self._selected = active
self._not_selected = ~self._selected
sign = np.sign(-_randomized_score)
active_signs = sign[self._selected]
sign[self._not_selected] = 0
self.selection_variable = {'sign': sign,
'variables': self._selected.copy()}
self.observed_opt_state = (np.fabs(_randomized_score) - self.threshold)[self._selected]
self.num_opt_var = self.observed_opt_state.shape[0]
opt_linear = np.zeros((p, self.num_opt_var))
opt_linear[self._selected,:] = np.diag(active_signs)
opt_offset = np.zeros(p)
opt_offset[self._selected] = active_signs * self.threshold[self._selected]
opt_offset[self._not_selected] = _randomized_score[self._not_selected]
self._setup = True
A_scaling = -np.identity(len(active_signs))
b_scaling = np.zeros(self.num_opt_var)
self._setup_sampler(A_scaling,
b_scaling,
opt_linear,
opt_offset)
return self._selected
[docs] @staticmethod
def type1(observed_data,
covariance,
marginal_level,
randomizer_scale,
perturb=None):
'''
Threshold
'''
randomized_stdev = np.sqrt(np.diag(covariance) + randomizer_scale**2)
p = covariance.shape[0]
randomizer = randomization.isotropic_gaussian((p,), randomizer_scale)
threshold = randomized_stdev * ndist.ppf(1. - marginal_level / 2.)
return marginal_screening(observed_data,
covariance,
randomizer,
threshold,
perturb=perturb)
# Stepup procedures like Benjamini-Hochberg
[docs]def stepup_selection(Z_values, stepup_Z):
absZ_argsort = np.argsort(np.fabs(Z_values))[::-1]
absZ_sorted = np.fabs(Z_values)[absZ_argsort]
survivors = absZ_sorted - stepup_Z >= 0
if np.any(survivors):
num_selected = max(np.nonzero(survivors)[0]) + 1
return (num_selected, # how many selected
absZ_argsort[:num_selected], # ordered indices of those selected
stepup_Z[num_selected - 1]) # the selected are greater than this number
else:
return 0, None, None
[docs]class stepup(screening):
[docs] def __init__(self,
observed_data,
covariance,
randomizer,
stepup_Z,
perturb=None):
screening.__init__(self,
observed_data,
covariance,
randomizer,
perturb=None)
self.stepup_Z = stepup_Z
if not (np.all(sorted(self.stepup_Z)[::-1] == self.stepup_Z) and
np.all(np.greater_equal(self.stepup_Z, 0))):
raise ValueError('stepup Z values should be non-negative and non-increasing')
[docs] def fit(self, perturb=None):
# condition on all those that survive, their sign,
# which was the last past the threshold
# and the observed (randomized) Z values of those that don't
_randomized_score, p = screening.fit(self, perturb=perturb)
K, selected_idx, last_cutoff = stepup_selection(_randomized_score, self.stepup_Z)
if K > 0:
self._selected = np.zeros(p, np.bool)
self._selected[selected_idx] = 1
self._not_selected = ~self._selected
sign = np.sign(-_randomized_score)
active_signs = sign[selected_idx]
sign[self._not_selected] = 0
self.selection_variable = {'sign': sign.copy(),
'variables': self._selected.copy(),
}
self.num_opt_var = self._selected.sum()
self.observed_opt_state = np.zeros(self.num_opt_var)
self.observed_opt_state[:] = np.fabs(_randomized_score[selected_idx]) - last_cutoff
opt_linear = np.zeros((p, self.num_opt_var))
for j in range(self.num_opt_var):
opt_linear[selected_idx[j], j] = active_signs[j]
opt_offset = np.zeros(p)
opt_offset[self._selected] = active_signs * last_cutoff
opt_offset[self._not_selected] = _randomized_score[self._not_selected]
self._setup = True
A_scaling = -np.identity(self.num_opt_var)
b_scaling = np.zeros(self.num_opt_var)
self._setup_sampler(A_scaling,
b_scaling,
opt_linear,
opt_offset)
else:
self._selected = np.zeros(p, np.bool)
return self._selected
[docs] @staticmethod
def BH(observed_score,
covariance,
randomizer_scale,
q=0.2,
perturb=None):
if not np.allclose(np.diag(covariance), covariance[0, 0]):
raise ValueError('Benjamin-Hochberg expecting Z scores with identical variance, standardize your Z')
p = observed_score.shape[0]
randomized_stdev = np.sqrt(covariance[0, 0] + randomizer_scale**2)
randomizer = randomization.isotropic_gaussian((p,), randomizer_scale)
# Benjamini-Hochberg cutoffs
stepup_Z = randomized_stdev * ndist.ppf(1 - q * np.arange(1, p + 1) / (2 * p))
return stepup(observed_score,
covariance,
randomizer,
stepup_Z,
perturb=perturb)
[docs] def BH_general(observed_score,
covariance,
randomizer_cov,
q=0.2,
perturb=None):
if not np.allclose(np.diag(covariance), covariance[0, 0]):
raise ValueError('Benjamin-Hochberg expecting Z scores with identical variance, standardize your Z')
if not np.allclose(np.diag(randomizer_cov), randomizer_cov[0, 0]):
raise ValueError('Benjamin-Hochberg expecting Z scores with identical variance, standardize your randomization')
p = observed_score.shape[0]
randomizer = randomization.gaussian(randomizer_cov)
randomized_stdev = np.sqrt(covariance[0, 0] + randomizer_cov[0, 0])
# Benjamini-Hochberg cutoffs
stepup_Z = randomized_stdev * ndist.ppf(1 - q * np.arange(1, p + 1) / (2 * p))
return stepup(observed_score,
covariance,
randomizer,
stepup_Z,
perturb=perturb)
[docs]class topK(screening):
[docs] def __init__(self,
observed_data,
covariance,
randomizer,
K, # how many to select?
abs=False, # absolute value of scores or not?
perturb=None):
screening.__init__(self,
observed_data,
covariance,
randomizer,
perturb=None)
self.K = K
self._abs = abs
[docs] def fit(self, perturb=None):
_randomized_score, p = screening.fit(self, perturb=perturb)
# fixing the topK
# gives us that u=\omega - Z is in a particular cone
# like: s_i u_i \geq |u_j| 1 \leq i \leq K, K+1 \leq j \leq p (when abs is True and s_i are topK signs) or
# u_i \geq u_j 1 \leq i \leq K, K+1 \leq j \leq p (when abs is False)
if self._abs:
Z = np.fabs(-_randomized_score)
topK = np.argsort(Z)[-self.K:]
selected = np.zeros(self.nfeature, np.bool)
selected[topK] = 1
self._selected = selected
self._not_selected = ~self._selected
sign = np.sign(Z)
topK_signs = sign[self._selected]
sign[self._not_selected] = 0
self.selection_variable = {'sign': sign,
'variables': self._selected.copy()}
self.observed_opt_state = np.fabs(Z[self._selected])
self.num_opt_var = self.observed_opt_state.shape[0]
opt_linear = np.zeros((p, self.num_opt_var))
opt_linear[self._selected,:] = np.diag(topK_signs)
opt_offset = np.zeros(p)
else:
Z = -_randomized_score
topK = np.argsort(Z)[-self.K:]
selected = np.zeros(self.nfeature, np.bool)
selected[topK] = 1
self._selected = selected
self._not_selected = ~self._selected
self.selection_variable = {'variables': self._selected.copy()}
self.observed_opt_state = Z[self._selected]
self.num_opt_var = self.observed_opt_state.shape[0]
opt_linear = np.zeros((p, self.num_opt_var))
opt_linear[self._selected,:] = np.identity(self.num_opt_var)
opt_offset = np.zeros(p)
# in both cases, this conditioning means we just need to compute
# the observed lower bound
# XXX what about abs?
lower_bound = np.max(Z[self._not_selected])
A_scaling = -np.identity(self.num_opt_var)
b_scaling = -np.ones(self.num_opt_var) * lower_bound
self._setup_sampler(A_scaling,
b_scaling,
opt_linear,
opt_offset)
return self._selected