"""
Core module for learning selection
"""
from copy import copy
import functools
import numpy as np
import pandas as pd
from scipy.stats import norm as ndist
from scipy.stats import binom
from ..distributions.discrete_family import discrete_family
# local imports
from .fitters import (gbm_fit_sk,
random_forest_fit_sk)
from .samplers import (normal_sampler,
split_sampler)
from .learners import mixture_learner
try:
from .keras_fit import keras_fit
except ImportError:
pass
[docs]def infer_general_target(observed_outcome,
observed_target,
target_cov,
learner,
fit_probability=gbm_fit_sk,
fit_args={'n_estimators':500},
hypothesis=0,
alpha=0.1,
success_params=(1, 1),
B=500,
learning_npz=None):
'''
Compute a p-value (or pivot) for a target having observed `outcome` of from `algorithm` run on the original data.
Parameters
----------
observed_outcome : object
The purported value observed, i.e. run with the original seed.
target_cov : np.float((1, 1)) # 1 for 1-dimensional targets for now
Covariance of target estimator
learner :
Object that generates perturbed data.
observed_target : np.float # 1-dimensional targets for now
Observed value of target estimator.
fit_probability : callable
Function to learn a probability model P(Y=1|T) based on [T, Y].
hypothesis : np.float # 1-dimensional targets for now
Hypothesized value of target.
alpha : np.float
Level for 1 - confidence.
B : int
How many queries?
'''
weight_fns, learning_data = learner.learn(fit_probability,
fit_args=fit_args,
check_selection=None,
B=B)
weight_fn = weight_fns[0]
if learning_npz is not None:
T, Y = learning_data
npz_results = {'T':T, 'Y':Y}
npz_results['nuisance'] = []
npz_results['direction'] = []
results = []
for i in range(observed_target.shape[0]):
cur_nuisance = observed_target - target_cov[i] / target_cov[i, i] * observed_target[i]
cur_nuisance.shape = observed_target.shape
direction = target_cov[i] / target_cov[i, i]
if learning_npz is not None:
npz_results['nuisance'].append(cur_nuisance)
npz_results['direction'].append(direction)
def new_weight_fn(cur_nuisance, direction, weight_fn, target_val):
return weight_fn(np.multiply.outer(target_val, direction) + cur_nuisance[None, :])
new_weight_fn = functools.partial(new_weight_fn, cur_nuisance, direction, weight_fn)
results.append(_inference(observed_target[i],
target_cov[i, i].reshape((1, 1)),
new_weight_fn,
hypothesis=hypothesis[i],
alpha=alpha,
success_params=success_params)[:4])
if learning_npz is not None:
np.savez(learning_npz, **npz_results)
return results
[docs]def infer_set_target(observed_set,
features,
observed_target,
target_cov,
learner,
fit_probability=gbm_fit_sk,
fit_args={'n_estimators':500},
hypothesis=[0],
alpha=0.1,
success_params=(1, 1),
B=500,
learning_npz=None,
single=False):
'''
Compute a p-value (or pivot) for a target having observed `outcome` of `algorithm` on original data.
Parameters
----------
observed_set : set(int)
The purported value observed when run with the original seed.
features : [int]
List of the elements of observed_set.
observed_target : np.ndarray
Statistic inference is based on.
target_cov : np.ndarray
(Pre-selection) covariance matrix of `observed_target`.
learner :
Object that generates perturbed data.
fit_probability : callable
Function to learn a probability model P(Y=1|T) based on [T, Y].
hypothesis : np.float # 1-dimensional targets for now
Hypothesized value of target.
alpha : np.float
Level for 1 - confidence.
B : int
How many queries?
Notes
-----
This function makes the assumption that covariance in observed sampler is the
true covariance of S and we are looking for inference about coordinates of the mean of
np.linalg.inv(covariance).dot(S)
this allows us to compute the required observed_target, cross_cov and target_cov.
'''
features = np.asarray(features)
if features.shape == ():
features = np.array([features])
observed_set = set(observed_set)
if np.any([f not in observed_set for f in features]):
raise ValueError('for a set target, we can only do inference for features observed in the outcome')
weight_fns, learning_data = learner.learn(fit_probability,
fit_args=fit_args,
check_selection=lambda result: np.array([f in set(result) for f in features]),
B=B)
if len(features) == 1 and success_params == (1, 1) and single:
single_inference = _single_parameter_inference(observed_target,
target_cov,
learning_data,
learner.proposal_density,
hypothesis=hypothesis[0],
alpha=alpha)
return [single_inference + (None,)]
else:
if learning_npz is not None:
T, Y = learning_data
npz_results = {'T':T, 'Y':Y}
npz_results['nuisance'] = []
npz_results['direction'] = []
results = []
for i in range(observed_target.shape[0]):
cur_nuisance = observed_target - target_cov[i] / target_cov[i, i] * observed_target[i]
cur_nuisance.shape = observed_target.shape
direction = target_cov[i] / target_cov[i, i]
if learning_npz is not None:
npz_results['nuisance'].append(cur_nuisance)
npz_results['direction'].append(direction)
def new_weight_fn(cur_nuisance, direction, weight_fn, target_val):
return weight_fn(np.multiply.outer(target_val, direction) + cur_nuisance[None, :])
new_weight_fn = functools.partial(new_weight_fn, cur_nuisance, direction, weight_fns[i])
results.append(_inference(observed_target[i],
target_cov[i, i].reshape((1, 1)),
new_weight_fn,
hypothesis=hypothesis[i],
alpha=alpha,
success_params=success_params)[:4])
if learning_npz is not None:
np.savez(learning_npz, **npz_results)
return results
[docs]def infer_full_target(algorithm,
observed_set,
features,
observed_sampler,
dispersion, # sigma^2
fit_probability=gbm_fit_sk,
fit_args={'n_estimators':500},
hypothesis=[0],
alpha=0.1,
success_params=(1, 1),
B=500,
learner_klass=mixture_learner,
learning_npz=None,
single=False):
'''
Compute a p-value (or pivot) for a target having observed `outcome` of `algorithm(observed_sampler)`.
Parameters
----------
algorithm : callable
Selection algorithm that takes a noise source as its only argument.
observed_set : set(int)
The purported value `algorithm(observed_sampler)`, i.e. run with the original seed.
features : [int]
List of the elements of observed_set.
observed_sampler : `normal_source`
Representation of the data used in the selection procedure.
dispersion : float
Scalar dispersion of the covariance of `observed_sampler`. In
OLS problems this is $\sigma^2$.
fit_probability : callable
Function to learn a probability model P(Y=1|T) based on [T, Y].
hypothesis : np.float # 1-dimensional targets for now
Hypothesized value of target.
alpha : np.float
Level for 1 - confidence.
B : int
How many queries?
Notes
-----
This function makes the assumption that covariance in observed sampler is the
true covariance of S and we are looking for inference about coordinates of the mean of
np.linalg.inv(covariance).dot(S)
this allows us to compute the required observed_target, cross_cov and target_cov.
'''
info_inv = np.linalg.inv(observed_sampler.covariance / dispersion) # scale free, i.e. X.T.dot(X) without sigma^2
target_cov = (info_inv[features][:, features] * dispersion)
observed_target = info_inv[features].dot(observed_sampler.center)
cross_cov = observed_sampler.covariance.dot(info_inv[:, features])
learner = learner_klass(algorithm,
observed_set,
observed_sampler,
observed_target,
target_cov,
cross_cov)
return infer_set_target(observed_set,
features,
observed_target,
target_cov,
learner,
fit_probability=fit_probability,
fit_args=fit_args,
hypothesis=hypothesis,
alpha=alpha,
success_params=success_params,
B=B,
learning_npz=learning_npz,
single=single)
def _inference(observed_target,
target_cov,
weight_fn, # our fitted function
success_params=(1, 1),
hypothesis=0,
alpha=0.1):
'''
Produce p-values (or pivots) and confidence intervals having estimated a weighting function.
The basic object here is a 1-dimensional exponential family with reference density proportional
to
lambda t: scipy.stats.norm.pdf(t / np.sqrt(target_cov)) * weight_fn(t)
Parameters
----------
observed_target : float
target_cov : np.float((1, 1))
hypothesis : float
Hypothesised true mean of target.
alpha : np.float
Level for 1 - confidence.
Returns
-------
pivot : float
Probability integral transform of the observed_target at mean parameter "hypothesis"
confidence_interval : (float, float)
(1 - alpha) * 100% confidence interval.
'''
k, m = success_params # need at least k of m successes
target_var = target_cov[0, 0]
target_sd = np.sqrt(target_var)
target_val = (np.linspace(-20 * target_sd, 20 * target_sd, 5001) +
observed_target)
if (k, m) != (1, 1):
weight_val = np.array([binom(m, p).sf(k-1) for p in
weight_fn(target_val)])
else:
weight_val = np.squeeze(weight_fn(target_val))
weight_val *= ndist.pdf((target_val - observed_target) / target_sd)
exp_family = discrete_family(target_val, weight_val)
pivot = exp_family.cdf((hypothesis - observed_target)
/ target_var, x=observed_target)
pivot = 2 * min(pivot, 1-pivot)
pvalue = exp_family.cdf(- observed_target / target_cov[0, 0],
x=observed_target)
pvalue = 2 * min(pvalue, 1-pvalue)
interval = exp_family.equal_tailed_interval(observed_target, alpha=alpha)
rescaled_interval = (interval[0] * target_var + observed_target,
interval[1] * target_var + observed_target)
return pivot, rescaled_interval, pvalue, weight_fn, exp_family # TODO: should do MLE as well does discrete_family do this?
def _single_parameter_inference(observed_target,
target_cov,
learning_data,
proposal_density,
hypothesis=0,
alpha=0.1):
'''
lambda t: scipy.stats.norm.pdf(t / np.sqrt(target_cov)) * weight_fn(t)
Parameters
----------
observed_target : float
target_cov : np.float((1, 1))
hypothesis : float
Hypothesised true mean of target.
alpha : np.float
Level for 1 - confidence.
Returns
-------
pivot : float
Probability integral transform of the observed_target at mean parameter "hypothesis"
confidence_interval : (float, float)
(1 - alpha) * 100% confidence interval.
'''
T, Y = learning_data
target_val = T[Y == 1]
target_var = target_cov[0, 0]
target_sd = np.sqrt(target_var)
weight_val = ndist.pdf((target_val - observed_target) / target_sd) / proposal_density(target_val.reshape((-1,1)))
exp_family = discrete_family(target_val, weight_val)
pivot = exp_family.cdf((hypothesis - observed_target) / target_var, x=observed_target)
pivot = 2 * min(pivot, 1-pivot)
pvalue = exp_family.cdf(-observed_target / target_var, x=observed_target)
pvalue = 2 * min(pvalue, 1-pvalue)
interval = exp_family.equal_tailed_interval(observed_target, alpha=alpha)
rescaled_interval = (interval[0] * target_var + observed_target[0],
interval[1] * target_var + observed_target[0])
return pivot, rescaled_interval, pvalue, exp_family # TODO: should do MLE as well does discrete_family do this?
[docs]def repeat_selection(base_algorithm, sampler, min_success, num_tries):
"""
Repeat a set-returning selection algorithm `num_tries` times,
returning all elements that appear at least `min_success` times.
"""
results = {}
for _ in range(num_tries):
current = base_algorithm(sampler)
for item in current:
results.setdefault(item, 0)
results[item] += 1
final_value = []
for key in results:
if results[key] >= min_success:
final_value.append(key)
return set(final_value)
[docs]def cross_inference(learning_data,
nuisance,
direction,
fit_probability,
nref=200,
fit_args={},
verbose=False):
T, Y = learning_data
idx = np.arange(T.shape[0])
np.random.shuffle(idx)
Tshuf, Yshuf = T[idx], Y[idx]
reference_T = Tshuf[:nref]
reference_Y = Yshuf[:nref]
nrem = T.shape[0] - nref
learning_T = Tshuf[nref:(nref+int(nrem/2))]
learning_Y = Tshuf[nref:(nref+int(nrem/2))]
dens_T = Tshuf[(nref+int(nrem/2)):]
pvalues = []
weight_fns = fit_probability(learning_T, learning_Y, **fit_args)
for (weight_fn,
cur_nuisance,
cur_direction,
learn_T,
ref_T,
ref_Y,
d_T) in zip(weight_fns,
nuisance,
direction,
learning_T.T,
reference_T.T,
reference_Y.T,
dens_T.T):
def new_weight_fn(nuisance, direction, weight_fn, target_val):
return weight_fn(np.multiply.outer(target_val, direction) + nuisance[None, :])
new_weight_fn = functools.partial(new_weight_fn, cur_nuisance, cur_direction, weight_fn)
weight_val = new_weight_fn(d_T)
exp_family = discrete_family(d_T, weight_val)
if verbose:
print(ref_Y)
pval = [exp_family.cdf(0, x=t) for t, y in zip(ref_T, ref_Y) if y == 1]
pvalues.append(pval)
return pvalues