"""
This module implements the class `truncated_gaussian` which
performs (conditional) UMPU tests for Gaussians
restricted to a set of intervals.
"""
import numpy as np
from ..distributions.pvalue import (norm_pdf,
truncnorm_cdf,
norm_q,
norm_interval,
mp)
from scipy.stats import norm as ndist
from .base import truncated, find_root
[docs]class truncated_gaussian(truncated):
"""
>>> from selectinf.constraints.intervals import intervals
>>> I = intervals.intersection(intervals((-1, 6)), \
intervals(( 0, 7)), \
~intervals((1, 4)))
>>> distr = truncated_gaussian(I, 3.1, 2.)
"""
[docs] def __init__(self, I, mu=0, scale = 1.):
"""
Create a new object for a truncated_gaussian distribution
Parameters
----------
I : intervals
The intervals the distribution is truncated to.
mu : int
Mean of Gaussian that is truncated.
scale : float
SD of Gaussian that is truncated.
"""
self._mu = mu
self._scale = scale
truncated.__init__(self, I)
def _cdf_notTruncated(self, a, b, dps):
"""
Compute the probability of being in the interval (a, b)
for a variable with a Gaussian distribution (not truncated)
Parameters
----------
a, b : float
Bounds of the interval. Can be infinite.
dps : int
Decimal precision (decimal places). Used in mpmath
Returns
-------
p : float
The probability of being in the intervals (a, b)
P( a < X < b)
for a non truncated variable
"""
scale = self._scale
mu = self._mu
dps_temp = mp.dps
mp.dps = dps
val = norm_interval((a-mu)/scale,
(b-mu)/scale)
mp.dps = dps_temp
return val
def _pdf_notTruncated(self, z, dps):
scale = self._scale
mu = self._mu
dps_temp = mp.dps
mp.dps = dps
val = norm_pdf(Z)
mp.dps = dps_temp
return val
def _quantile_notTruncated(self, q, dps, tol=1.e-6):
"""
Compute the quantile for the non truncated distribution
Parameters
----------
q : float
quantile you want to compute. Between 0 and 1
tol : float
precision for the output
Returns
-------
x : float
x such that P(X < x) = q
"""
scale = self._scale
mu = self._mu
dps_temp = mp.dps
mp.dps = dps
val = norm_q(q)
mp.dps = dps_temp
return val
[docs]class truncated_gaussian_old(object):
"""
A Gaussian distribution, truncated to
"""
[docs] def __init__(self, intervals, mu=0, scale=1):
intervals = np.unique(intervals)
intervals = np.asarray(intervals).reshape(-1)
# makes assumption intervals are disjoint
# and the sorted endpoints give the correct
# set of intervals...
self._cutoff_array = np.sort(intervals)
D = self.intervals[:,1]-self.intervals[:,0]
I = self.intervals[D != 0]
self._cutoff_array = I.reshape(-1)
self._mu = mu
self._scale = scale
self._mu_or_scale_changed()
def __array__(self):
return self.intervals
@property
def intervals(self):
return self._cutoff_array.reshape((-1,2))
@property
def negated(self):
if not hasattr(self,"_negated"):
klass = type(self)
self._negated = klass(np.asarray(-self._cutoff_array[::-1]),
mu=-self.mu,
scale=self.scale)
return self._negated
# private method to update P and D after a change of parameters
def _mu_or_scale_changed(self):
mu, scale = self.mu, self.scale
self.P = np.array([norm_interval((a-mu)/scale,
(b-mu)/scale)
for a, b in self.intervals])
self.D = np.array([(norm_pdf((a-mu)/scale),
norm_pdf((b-mu)/scale))
for a, b in self.intervals])
# mean parameter : mu
[docs] def set_mu(self, mu):
self._mu = mu
self._mu_or_scale_changed()
[docs] def get_mu(self):
return self._mu
mu = property(get_mu, set_mu)
# variance parameter : scale
[docs] def set_scale(self, scale):
self._scale = scale
self._mu_or_scale_changed()
[docs] def get_scale(self):
return self._scale
scale = property(get_scale, set_scale)
@property
def delta(self):
r"""
.. math::
\begin{align}
\delta_\mu(a,b) &\triangleq \int_a^b x\phi(x-\mu)\,dx \\
&= - \phi(b-\mu) + \phi(a-\mu) +
\mu\left(\Phi(b-\mu)-\Phi(a-\mu)\right),
\end{align}
"""
mu, P, D = self.mu, self.P, self.D
return D[:,0] - D[:,1] + mu * P
# End of properties
[docs] @staticmethod
def twosided(thresh, mu=0, scale=1):
thresh = np.fabs(thresh)
return truncated_gaussian([(-np.inf,-thresh),(thresh,np.inf)],
mu=mu, scale=scale)
def __repr__(self):
return '''%s(%s, mu=%0.3e, scale=%0.3e)''' % (self.__class__.__name__,
self.intervals,
self.mu,
self.scale)
[docs] def cdf(self, observed):
P, mu, scale = self.P, self.mu, self.scale
z = observed
k = int(np.floor((self.intervals <= observed).sum() / 2))
if k < self.intervals.shape[0]:
if observed > self.intervals[k,0]:
return (P[:k].sum() +
(norm_interval((self.intervals[k,0] - mu) / scale,
(observed - mu) / scale))
) / P.sum()
else:
return P[:k].sum() / P.sum()
else:
return 1.
[docs] def quantile(self, q):
P, mu, scale = self.P, self.mu, self.scale
Psum = P.sum()
Csum = np.cumsum(np.array([0]+list(P)))
k = max(np.nonzero(Csum < Psum*q)[0])
try:
k = max(np.nonzero(Csum < Psum*q)[0])
except ValueError:
if np.isnan(q):
raise TruncatedGaussianError('invalid quantile')
pnorm_increment = Psum*q - Csum[k]
if np.mean(self.intervals[k]) < 0:
return mu + norm_q(norm_interval(-np.inf,(self.intervals[k,0]-mu)/scale) + pnorm_increment) * scale
else:
return mu - norm_q(norm_interval((self.intervals[k,0]-mu)/scale, np.inf) - pnorm_increment) * scale
# make a function for vector version?
[docs] def right_endpoint(self, left_endpoint, alpha):
c1 = left_endpoint # shorthand from Will's code
mu, P = self.mu, self.P
alpha1 = self.cdf(left_endpoint)
if (alpha1 > alpha):
return np.nan
alpha2 = np.array(alpha - alpha1)
return self.quantile(1-alpha2)
[docs] def G(self, left_endpoint, alpha):
"""
$g_{\mu}$ from Will's code
"""
klass = self.__class__
c1 = left_endpoint # shorthand from Will's code
mu, P, D = self.mu, self.P, self.D
const = np.array(1-alpha)*(np.sum(D[:,0]-D[:,1]) + mu*P.sum())
right_endpoint = float(self.right_endpoint(left_endpoint, alpha))
if np.isnan(right_endpoint):
return np.inf
valid_intervals = []
for a, b in self.intervals:
intersection = (max(left_endpoint, a),
min(right_endpoint, b))
if intersection[1] > intersection[0]:
valid_intervals.append(intersection)
if valid_intervals:
return klass(valid_intervals, mu=self.mu, scale=self.scale).delta.sum() - const
return 0
[docs] def dG(self, left_endpoint, alpha):
"""
$gg_{\mu}$ from Will's code
"""
c1 = left_endpoint # shorthand from Will's code
D = self.D
return (self.right_endpoint(left_endpoint, alpha) -
left_endpoint) * norm_pdf((left_endpoint - self.mu) /
self.scale)
[docs] def equal_tailed_interval(self, observed, alpha):
old_mu = self.mu
lb = self.mu - 20. * self.scale
ub = self.mu + 20. * self.scale
def F(param):
self.mu = param
return self.cdf(observed)
#from scipy.optimize import bisect
#FL = lambda x: (F(x) - (1 - 0.5 * alpha))
#FU = lambda x: (F(x) - 0.5 * alpha)
#L_conf = bisect(FL, lb, ub)
#U_conf = bisect(FU, lb, ub)
#return np.array([L_conf, U_conf])
L = find_root(F, 1.0 - 0.5 * alpha, lb, ub)
U = find_root(F, 0.5 * alpha, lb, ub)
self.mu = old_mu
return np.array([L, U])
[docs] def UMAU_interval(self, observed, alpha,
mu_lo=None,
mu_hi=None,
tol=1.e-8):
old_mu = self.mu
try:
upper = _UMAU(observed,
alpha, self,
mu_lo=mu_lo,
mu_hi=mu_hi,
tol=tol)
except TruncatedGaussianError:
upper = np.inf
tg_neg = self.negated
try:
lower = -_UMAU(-observed,
alpha, tg_neg,
mu_lo=mu_hi,
mu_hi=mu_lo,
tol=tol)
except:
lower = -np.inf
self.mu, self.negated.mu = old_mu, old_mu
return np.array([lower, upper])
[docs]def G(left_endpoints, mus, alpha, tg):
"""
Compute the $G$ function of `tg(intervals)` over
`zip(left_endpoints, mus)`.
A copy is made of `tg` and its $(\mu,\scale)$ are not modified.
"""
tg = truncated_gaussian(tg.intervals)
results = []
for left_endpoint, mu in zip(left_endpoints, mus):
tg.mu = mu
results.append(tg.G(left_endpoint, alpha))
return np.array(results)
[docs]def dG(left_endpoints, mus, alpha, tg):
"""
Compute the $G$ function of `tg(intervals)` over
`zip(left_endpoints, mus)`.
A copy is made of `tg` and its $(\mu,\scale)$ are not modified.
"""
tg = truncated_gaussian(tg.intervals)
results = []
for left_endpoint, mu in zip(left_endpoints, mus):
tg.mu = mu
results.append(tg.dG(left_endpoint, alpha))
return np.array(results)
[docs]class TruncatedGaussianError(ValueError):
pass
def _UMAU(observed, alpha, tg,
mu_lo=None,
mu_hi=None,
tol=1.e-8):
klass = type(tg)
tg = klass(tg.intervals, scale=tg.scale)
X = observed # shorthand
if mu_lo is None:
mu_lo = X
if mu_hi is None:
mu_hi = X + 2
# find upper and lower points for bisection
tg.mu = mu_lo
while tg.G(X, alpha) < 0: # mu_too_high
mu_lo, mu_hi = mu_lo - 2, mu_lo
tg.mu = mu_lo
tg.mu = mu_hi
while tg.G(X, alpha) > 0: # mu_too_low
mu_lo, mu_hi = mu_hi, mu_hi + 2
tg.mu = mu_hi
# bisection
while mu_hi - mu_lo > tol:
mu_bar = 0.5 * (mu_lo + mu_hi)
tg.mu = mu_bar
if tg.G(X, alpha) < 0:
mu_hi = mu_bar
else:
mu_lo = mu_bar
return mu_bar
[docs]def find_root(f, y, lb, ub, tol=1e-6):
"""
searches for solution to f(x) = y in (lb, ub), where
f is a monotone decreasing function
"""
# make sure solution is in range
a, b = lb, ub
fa, fb = f(a), f(b)
# assume a < b
if fa > y and fb > y:
while fb > y :
b, fb = b + (b-a), f(b + (b-a))
elif fa < y and fb < y:
while fa < y :
a, fa = a - (b-a), f(a - (b-a))
# determine the necessary number of iterations
max_iter = int( np.ceil( ( np.log(tol) - np.log(b-a) ) / np.log(0.5) ) )
# bisect (slow but sure) until solution is obtained
for _ in range(max_iter):
c, fc = (a+b)/2, f((a+b)/2)
if fc > y: a = c
elif fc < y: b = c
return c