"""
This module contains a class for discrete
1-dimensional exponential families. The main
uses for this class are exact (post-selection)
hypothesis tests and confidence intervals.
"""
import numpy as np
import warnings
from ..truncated.api import find_root
[docs]def crit_func(test_statistic, left_cut, right_cut):
"""
A generic critical function for an interval,
with weights at the endpoints.
((test_statistic < CL) + (test_statistic > CR) +
gammaL * (test_statistic == CL) +
gammaR * (test_statistic == CR))
where (CL, gammaL) = left_cut, (CR, gammaR) = right_cut.
Parameters
----------
test_statistic : np.float
Observed value of test statistic.
left_cut : (float, float)
(CL, gammaL): left endpoint and value at exactly the left endpoint (should be in [0,1]).
right_cut : (float, float)
(CR, gammaR): right endpoint and value at exactly the right endpoint (should be in [0,1]).
Returns
-------
decision : np.float
"""
CL, gammaL = left_cut
CR, gammaR = right_cut
value = ((test_statistic < CL) + (test_statistic > CR)) * 1.
if gammaL != 0:
value += gammaL * (test_statistic == CL)
if gammaR != 0:
value += gammaR * (test_statistic == CR)
return value
[docs]class discrete_family(object):
[docs] def __init__(self, sufficient_stat, weights, theta=0.):
r"""
A discrete 1-dimensional
exponential family with reference measure $\sum_j w_j \delta_{X_j}$
and sufficient statistic `sufficient_stat`. For any $\theta$, the distribution
is
.. math::
P_{\theta} = \sum_{j} e^{\theta X_j - \Lambda(\theta)} w_j \delta_{X_j}
where
.. math::
\Lambda(\theta) = \log \left(\sum_j w_j e^{\theta X_j} \right).
Parameters
----------
sufficient_stat : `np.float((n))`
weights : `np.float(n)`
Notes
-----
The weights are normalized to sum to 1.
"""
xw = np.array(sorted(zip(sufficient_stat, weights)), np.float)
self._x = xw[:,0]
self._w = xw[:,1]
self._lw = np.array([np.log(v) for v in xw[:,1]])
self._w /= self._w.sum() # make sure they are a pmf
self.n = len(xw)
self._theta = np.nan
self.theta = theta
@property
def theta(self):
"""
The natural parameter of the family.
"""
return self._theta
@theta.setter
def theta(self, _theta):
if _theta != self._theta:
_thetaX = _theta * self.sufficient_stat + self._lw
_largest = _thetaX.max() - 5 # try to avoid over/under flow, 5 seems arbitrary
_exp_thetaX = np.exp(_thetaX - _largest)
_prod = _exp_thetaX
self._partition = np.sum(_prod)
self._pdf = _prod / self._partition
self._partition *= np.exp(_largest)
self._theta = _theta
@property
def partition(self):
r"""
Partition function at `self.theta`:
.. math::
\sum_j e^{\theta X_j} w_j
"""
if hasattr(self, "_partition"):
return self._partition
@property
def sufficient_stat(self):
"""
Sufficient statistics of the exponential family.
"""
return self._x
@property
def weights(self):
"""
Weights of the exponential family.
"""
return self._w
[docs] def pdf(self, theta):
r"""
Density of $P_{\theta}$ with respect to $P_0$.
Parameters
----------
theta : float
Natural parameter.
Returns
-------
pdf : np.float
"""
self.theta = theta # compute partition if necessary
return self._pdf
[docs] def cdf(self, theta, x=None, gamma=1):
r"""
The cumulative distribution function of $P_{\theta}$ with
weight `gamma` at `x`
.. math::
P_{\theta}(X < x) + \gamma * P_{\theta}(X = x)
Parameters
----------
theta : float
Natural parameter.
x : float (optional)
Where to evaluate CDF.
gamma : float(optional)
Weight given at `x`.
Returns
-------
cdf : np.float
"""
pdf = self.pdf(theta)
if x is None:
return np.cumsum(pdf) - pdf * (1 - gamma)
else:
tr = np.sum(pdf * (self.sufficient_stat < x))
if x in self.sufficient_stat:
tr += gamma * np.sum(pdf[np.where(self.sufficient_stat == x)])
return tr
[docs] def ccdf(self, theta, x=None, gamma=0, return_unnorm=False):
r"""
The complementary cumulative distribution function
(i.e. survival function) of $P_{\theta}$ with
weight `gamma` at `x`
.. math::
P_{\theta}(X > x) + \gamma * P_{\theta}(X = x)
Parameters
----------
theta : float
Natural parameter.
x : float (optional)
Where to evaluate CCDF.
gamma : float(optional)
Weight given at `x`.
Returns
-------
ccdf : np.float
"""
pdf = self.pdf(theta)
if x is None:
return np.cumsum(pdf[::-1])[::-1] - pdf * (1 - gamma)
else:
tr = np.sum(pdf * (self.sufficient_stat > x))
if x in self.sufficient_stat:
tr += gamma * np.sum(pdf[np.where(self.sufficient_stat == x)])
return tr
[docs] def E(self, theta, func):
r"""
Expectation of `func` under $P_{\theta}$
Parameters
----------
theta : float
Natural parameter.
func : callable
Assumed to be vectorized.
gamma : float(optional)
Weight given at `x`.
Returns
-------
E : np.float
"""
T = np.asarray(func(self.sufficient_stat))
pdf_ = self.pdf(theta)
if T.ndim == 1:
return (T * pdf_).sum()
else:
val = (T * pdf_[:,None]).sum(0)
return val
[docs] def Var(self, theta, func):
r"""
Variance of `func` under $P_{\theta}$
Parameters
----------
theta : float
Natural parameter.
func : callable
Assumed to be vectorized.
Returns
-------
var : np.float
"""
mu = self.E(theta, func)
return self.E(theta, lambda x: (func(x)-mu)**2)
[docs] def Cov(self, theta, func1, func2):
r"""
Covariance of `func1` and `func2` under $P_{\theta}$
Parameters
----------
theta : float
Natural parameter.
func1, func2 : callable
Assumed to be vectorized.
Returns
-------
cov : np.float
"""
mu1 = self.E(theta, func1)
mu2 = self.E(theta, func2)
return self.E(theta, lambda x: (func1(x)-mu1)*(func2(x)-mu2))
[docs] def two_sided_acceptance(self, theta, alpha=0.05, tol=1e-6):
r"""
Compute cutoffs of UMPU two-sided test.
Parameters
----------
theta : float
Natural parameter.
alpha : float (optional)
Size of two-sided test.
tol : float
Tolerance for root-finding.
Returns
-------
left_cut : (float, float)
Boundary and randomization weight for left endpoint.
right_cut : (float, float)
Boundary and randomization weight for right endpoint.
"""
if theta != self._theta:
CL = np.max([x for x in self.sufficient_stat if self._critCovFromLeft(theta, (x, 0), alpha) >= 0])
gammaL = find_root(lambda x: self._critCovFromLeft(theta, (CL, x), alpha), 0., 0., 1., tol)
CR, gammaR = self._rightCutFromLeft(theta, (CL, gammaL), alpha)
self._left_cut, self._right_cut = (CL, gammaL), (CR, gammaR)
return self._left_cut, self._right_cut
[docs] def two_sided_test(self, theta0, observed, alpha=0.05, randomize=True, auxVar=None):
r"""
Perform UMPU two-sided test.
Parameters
----------
theta0 : float
Natural parameter under null hypothesis.
observed : float
Observed sufficient statistic.
alpha : float (optional)
Size of two-sided test.
randomize : bool
Perform the randomized test (or conservative test).
auxVar : [None, float]
If randomizing and not None, use this
as the random uniform variate.
Returns
-------
decision : np.bool
Is the null hypothesis $H_0:\theta=\theta_0$ rejected?
Notes
-----
We need an auxiliary uniform variable to carry out the randomized test.
Larger auxVar corresponds to x being slightly "larger." It can be passed in,
or chosen at random. If randomize=False, we get a conservative test.
"""
if randomize:
if auxVar is None:
auxVar = np.random.random()
rejLeft = self._test2RejectsLeft(theta0, observed, alpha, auxVar)
rejRight = self._test2RejectsRight(theta0, observed, alpha, auxVar)
else:
rejLeft = self._test2RejectsLeft(theta0, observed, alpha)
rejRight = self._test2RejectsRight(theta0, observed, alpha)
return rejLeft or rejRight
[docs] def one_sided_test(self, theta0, observed, alternative='greater', alpha=0.05, randomize=True, auxVar=None):
r"""
Perform UMPU one-sided test.
Parameters
----------
theta0 : float
Natural parameter under null hypothesis.
observed : float
Observed sufficient statistic.
alternative : str
One of ['greater', 'less']
alpha : float (optional)
Size of two-sided test.
randomize : bool
Perform the randomized test (or conservative test).
auxVar : [None, float]
If randomizing and not None, use this
as the random uniform variate.
Returns
-------
decision : np.bool
Is the null hypothesis $H_0:\theta=\theta_0$ rejected?
Notes
-----
We need an auxiliary uniform variable to carry out the randomized test.
Larger auxVar corresponds to x being slightly "larger." It can be passed in,
or chosen at random. If randomize=False, we get a conservative test.
"""
if alternative not in ['greater', 'less']:
raise ValueError('alternative must be one of ["greater", "less"]')
self.theta = theta0
if randomize:
if auxVar is None:
auxVar = np.random.random()
if alternative == 'greater':
return self.ccdf(theta0, observed, gamma=auxVar) < alpha
else:
return self.cdf(theta0, observed, gamma=auxVar) < alpha
else:
if alternative == 'greater':
return self.ccdf(theta0, observed) < alpha
else:
return self.cdf(theta0, observed) < alpha
[docs] def interval(self, observed, alpha=0.05, randomize=True, auxVar=None, tol=1e-6):
"""
Form UMAU confidence interval.
Parameters
----------
observed : float
Observed sufficient statistic.
alpha : float (optional)
Size of two-sided test.
randomize : bool
Perform the randomized test (or conservative test).
auxVar : [None, float]
If randomizing and not None, use this
as the random uniform variate.
Returns
-------
lower, upper : float
Limits of confidence interval.
"""
if randomize:
if auxVar is None:
auxVar = np.random.random()
upper = self._inter2Upper(observed, auxVar, alpha, tol)
lower = self._inter2Lower(observed, auxVar, alpha, tol)
else:
upper = self._inter2Upper(observed, 1., alpha, tol)
lower = self._inter2Lower(observed, 0., alpha, tol)
return lower, upper
[docs] def equal_tailed_interval(self, observed, alpha=0.05, randomize=True, auxVar=None, tol=1e-6):
"""
Form interval by inverting
equal-tailed test with $\alpha/2$ in each tail.
Parameters
----------
observed : float
Observed sufficient statistic.
alpha : float (optional)
Size of two-sided test.
randomize : bool
Perform the randomized test (or conservative test).
auxVar : [None, float]
If randomizing and not None, use this
as the random uniform variate.
Returns
-------
lower, upper : float
Limits of confidence interval.
"""
mu = self.E(self.theta, lambda x: x)
sigma = np.sqrt(self.Var(self.theta, lambda x: x))
lb = mu - 20 * sigma
ub = mu + 20 * sigma
F = lambda th : self.cdf(th, observed)
L = find_root(F, 1.0 - 0.5 * alpha, lb, ub)
U = find_root(F, 0.5 * alpha, lb, ub)
return L, U
[docs] def equal_tailed_test(self, theta0, observed, alpha=0.05):
r"""
Perform UMPU two-sided test.
Parameters
----------
theta0 : float
Natural parameter under null hypothesis.
observed : float
Observed sufficient statistic.
alpha : float (optional)
Size of two-sided test.
randomize : bool
Perform the randomized test (or conservative test).
auxVar : [None, float]
If randomizing and not None, use this
as the random uniform variate.
Returns
-------
decision : np.bool
Is the null hypothesis $H_0:\theta=\theta_0$ rejected?
Notes
-----
We need an auxiliary uniform variable to carry out the randomized test.
Larger auxVar corresponds to x being slightly "larger." It can be passed in,
or chosen at random. If randomize=False, we get a conservative test.
"""
pval = self.cdf(theta0, observed, gamma=0.5)
return min(pval, 1-pval) < alpha
[docs] def one_sided_acceptance(self, theta,
alpha=0.05,
alternative='greater',
tol=1e-6):
r"""
Compute the acceptance region cutoffs of UMPU one-sided test.
TODO: Include randomization?
Parameters
----------
theta : float
Natural parameter.
alpha : float (optional)
Size of two-sided test.
alternative : str
One of ['greater', 'less'].
tol : float
Tolerance for root-finding.
Returns
-------
left_cut : (float, float)
Boundary and randomization weight for left endpoint.
right_cut : (float, float)
Boundary and randomization weight for right endpoint.
"""
if alternative == 'greater':
F = self.ccdf(theta, gamma=0.5)
cutoff = np.min(self.sufficient_stat[F <= alpha])
acceptance = (-np.inf, cutoff)
elif alternative == 'less':
F = self.ccdf(theta, gamma=0.5)
cutoff = np.max(self.sufficient_stat[F <= alpha])
acceptance = (cutoff, np.inf)
else:
raise ValueError("alternative should be one of ['greater', 'less']")
return acceptance
[docs] def equal_tailed_acceptance(self, theta0, alpha=0.05):
r"""
Compute the acceptance region cutoffs of
equal-tailed test (without randomization).
Therefore, size may not be exactly $\alpha$.
Parameters
----------
theta0 : float
Natural parameter under null hypothesis.
alpha : float (optional)
Size of two-sided test.
Returns
-------
left_cut : (float, float)
Boundary and randomization weight for left endpoint.
right_cut : (float, float)
Boundary and randomization weight for right endpoint.
"""
F = self.cdf(theta0, gamma=0.5)
Lcutoff = np.max(self.sufficient_stat[F <= 0.5 * alpha])
Rcutoff = np.min(self.sufficient_stat[F >= 1 - 0.5*alpha])
return Lcutoff, Rcutoff
[docs] def MLE(self, observed, initial=0,
max_iter=20, tol=1.e-4):
r"""
Compute the maximum likelihood estimator
based on observed sufficient statistic `observed`.
Parameters
----------
observed : float
Observed value of sufficient statistic
initial : float
Starting point for Newton-Raphson
max_iter : int (optional)
Maximum number of Newton-Raphson iterations
tol : float (optional)
Tolerance parameter for stopping, based
on relative change in parameter estimate.
Iteration stops when the change is smaller
than `tol * max(1, np.fabs(cur_estimate))`.
Returns
-------
theta_hat : float
Maximum likelihood estimator.
std_err : float
Estimated variance of `theta_hat` based
on inverse of variance of sufficient
statistic at `theta_hat`, i.e. the
observed Fisher information.
"""
cur_est = initial
def first_two_moments(x):
return np.array([x, x**2]).T
for i in range(max_iter):
cur_moments = self.E(cur_est, first_two_moments) # gradient and
# Hessian of CGF
# (almost)
grad, hessian = (cur_moments[0] - observed,
cur_moments[1] - cur_moments[0]**2)
next_est = cur_est - grad / hessian # newton step
if np.fabs(next_est - cur_est) < tol * max(1, np.fabs(cur_est)):
break
cur_est = next_est
if i == max_iter - 1:
warnings.warn('Newton-Raphson failed to converge after %d iterations' % max_iter)
cur_moments = self.E(cur_est, first_two_moments) # gradient and
# Hessian of CGF
# (almost)
grad, hessian = (cur_moments[0] - observed,
cur_moments[1] - cur_moments[0]**2)
return cur_est, 1. / hessian, grad
# Private methods
def _rightCutFromLeft(self, theta, leftCut, alpha=0.05):
"""
Given C1, gamma1, choose C2, gamma2 to make E(phi(X)) = alpha
"""
C1, gamma1 = leftCut
alpha1 = self.cdf(theta, C1, gamma1)
if alpha1 >= alpha:
return (np.inf, 1)
else:
alpha2 = alpha - alpha1
P = self.ccdf(theta, gamma=0)
idx = np.nonzero(P < alpha2)[0].min()
cut = self.sufficient_stat[idx]
pdf_term = np.exp(theta * cut) / self.partition * self.weights[idx]
ccdf_term = P[idx]
gamma2 = (alpha2 - ccdf_term) / pdf_term
return (cut, gamma2)
def _leftCutFromRight(self, theta, rightCut, alpha=0.05):
"""
Given C2, gamma2, choose C1, gamma1 to make E(phi(X)) = alpha
"""
C2, gamma2 = rightCut
alpha2 = self.ccdf(theta, C2, gamma2)
if alpha2 >= alpha:
return (-np.inf, 1)
else:
alpha1 = alpha - alpha2
P = self.cdf(theta, gamma=0)
idx = np.nonzero(P < alpha1)[0].max()
cut = self.sufficient_stat[idx]
cdf_term = P[idx]
pdf_term = np.exp(theta * cut) / self.partition * self.weights[idx]
gamma1 = (alpha1 - cdf_term) / pdf_term
return (cut, gamma1)
def _critCovFromLeft(self, theta, leftCut, alpha=0.05):
"""
Covariance of X with phi(X) where phi(X) is the level-alpha test with left cutoff C1, gamma1
"""
C1, gamma1 = leftCut
C2, gamma2 = self._rightCutFromLeft(theta, leftCut, alpha)
if C2 == np.inf:
return -np.inf
else:
return self.Cov(theta, lambda x: x, lambda x: crit_func(x, (C1, gamma1), (C2, gamma2)))
def _critCovFromRight(self, theta, rightCut, alpha=0.05):
"""
Covariance of X with phi(X) where phi(X) is the level-alpha test with right cutoff C2, gamma2
"""
C2, gamma2 = rightCut
C1, gamma1 = self._leftCutFromRight(theta, rightCut, alpha)
if C1 == -np.inf:
return np.inf
else:
return self.Cov(theta, lambda x: x, lambda x: crit_func(x, (C1, gamma1), (C2, gamma2)))
def _test2RejectsLeft(self, theta, observed, alpha=0.05, auxVar=1.):
"""
Returns 1 if x in left lobe of umpu two-sided rejection region
We need an auxiliary uniform variable to carry out the randomized test.
Larger auxVar corresponds to "larger" x, so LESS likely to reject
auxVar = 1 is conservative
"""
return self._critCovFromLeft(theta, (observed, auxVar), alpha) > 0
def _test2RejectsRight(self, theta, observed, alpha=0.05, auxVar=0.):
"""
Returns 1 if x in right lobe of umpu two-sided rejection region
We need an auxiliary uniform variable to carry out the randomized test.
Larger auxVar corresponds to x being slightly "larger," so MORE likely to reject.
auxVar = 0 is conservative.
"""
return self._critCovFromRight(theta, (observed, 1.-auxVar), alpha) < 0
def _inter2Upper(self, observed, auxVar, alpha=0.05, tol=1e-6):
"""
upper bound of two-sided umpu interval
"""
if observed < self.sufficient_stat[0] or (observed == self.sufficient_stat[0] and auxVar <= alpha):
return -np.inf # observed, auxVar too small, every test rejects left
if observed > self.sufficient_stat[self.n - 2] or (observed == self.sufficient_stat[self.n - 2] and auxVar == 1.):
return np.inf # observed, auxVar too large, no test rejects left
return find_root(lambda theta: -1*self._test2RejectsLeft(theta, observed, alpha, auxVar), -0.5, -1., 1., tol)
def _inter2Lower(self, observed, auxVar, alpha=0.05, tol=1e-6):
"""
lower bound of two-sided umpu interval
"""
if observed > self.sufficient_stat[self.n-1] or (observed == self.sufficient_stat[self.n-1] and auxVar >= 1.-alpha):
return np.inf # observed, auxVar too large, every test rejects right
if observed < self.sufficient_stat[1] or (observed == self.sufficient_stat[1] and auxVar == 0.):
return -np.inf # observed, auxVar too small, no test rejects right
return find_root(lambda theta: 1.*self._test2RejectsRight(theta, observed, alpha, auxVar), 0.5, -1., 1., tol)