Source code for selectinf.randomized.randomization

"""
Different randomization options for selective sampler.
Main method used in selective sampler is the gradient method which
should be a gradient of the negative of the log-density. For a
Gaussian density, this will be a convex function, not a concave function.
"""
from __future__ import division, print_function
import numpy as np
import regreg.api as rr
from scipy.stats import laplace, logistic, norm as ndist

[docs]class randomization(rr.smooth_atom):
[docs] def __init__(self, shape, density, cdf, pdf, derivative_log_density, grad_negative_log_density, sampler, lipschitz=1, log_density=None, CGF=None, # cumulant generating function and gradient CGF_conjugate=None, # convex conjugate of CGF and gradient cov_prec=None # will have a covariance matrix if Gaussian ): rr.smooth_atom.__init__(self, shape) self._density = density self._cdf = cdf self._pdf = pdf self._derivative_log_density = derivative_log_density self._grad_negative_log_density = grad_negative_log_density self._sampler = sampler self.lipschitz = lipschitz if log_density is None: log_density = lambda x: np.log(density(x)) self._log_density = log_density self.CGF = CGF self.CGF_conjugate = CGF_conjugate if cov_prec is not None: self.cov_prec = cov_prec
[docs] def smooth_objective(self, perturbation, mode='both', check_feasibility=False): """ Compute the negative log-density and its gradient. """ if mode == 'func': return self.scale(-np.log(self._density(perturbation))) elif mode == 'grad': return self.scale(self._grad_negative_log_density(perturbation)) elif mode == 'both': return self.scale(-np.log(self._density(perturbation))), self.scale(self._grad_negative_log_density(perturbation)) else: raise ValueError("mode incorrectly specified")
[docs] def sample(self, size=()): return self._sampler(size=size)
[docs] def gradient(self, perturbation): """ Evaluate the gradient of the log-density. Parameters ---------- perturbation : np.float Returns ------- gradient : np.float """ return self.smooth_objective(perturbation, mode='grad')
[docs] def log_density(self, perturbation): """ Evaluate the log-density. Parameters ---------- perturbation : np.float Returns ------- value : float """ return np.squeeze(self._log_density(perturbation))
[docs] def randomize(self, loss, epsilon=0, perturb=None): """ Randomize the loss. """ randomized_loss = rr.smooth_sum([loss]) if perturb is None: perturb = self.sample() randomized_loss.quadratic = rr.identity_quadratic(epsilon, 0, -perturb, 0) return randomized_loss, perturb
[docs] @staticmethod def isotropic_gaussian(shape, scale): """ Isotropic Gaussian with SD `scale`. Parameters ---------- shape : tuple Shape of noise. scale : float SD of noise. """ if type(shape) == type(1): shape = (shape,) rv = ndist(scale=scale, loc=0.) p = np.product(shape) density = lambda x: np.product(rv.pdf(x)) cdf = lambda x: rv.cdf(x) pdf = lambda x: rv.pdf(x) derivative_log_density = lambda x: -x/(scale**2) grad_negative_log_density = lambda x: x / scale**2 sampler = lambda size: rv.rvs(size=shape + size) CGF = isotropic_gaussian_CGF(shape, scale) CGF_conjugate = isotropic_gaussian_CGF_conjugate(shape, scale) p = np.product(shape) constant = 0 # -0.5 * p * np.log(2 * np.pi * scale**2) return randomization(shape, density, cdf, pdf, derivative_log_density, grad_negative_log_density, sampler, lipschitz=1./scale**2, log_density = lambda x: -0.5 * (np.atleast_2d(x)**2).sum(1) / scale**2 + constant, CGF=CGF, CGF_conjugate=CGF_conjugate, cov_prec=(scale**2, 1. / scale**2) )
[docs] @staticmethod def gaussian(covariance): """ Gaussian noise with a given covariance. Parameters ---------- covariance : np.float((*,*)) Positive definite covariance matrix. Non-negative definite will raise an error. """ precision = np.linalg.inv(covariance) sqrt_precision = np.linalg.cholesky(precision) _det = np.linalg.det(covariance) p = covariance.shape[0] _const = 1. # np.sqrt((2*np.pi)**p * _det) density = lambda x: np.exp(-(x * precision.dot(x)).sum() / 2) / _const cdf = lambda x: None pdf = lambda x: None derivative_log_density = lambda x: None grad_negative_log_density = lambda x: precision.dot(x) sampler = lambda size: covariance.dot(sqrt_precision.dot(np.random.standard_normal((p,) + size))) return randomization((p,), density, cdf, pdf, derivative_log_density, grad_negative_log_density, sampler, lipschitz=np.linalg.svd(precision)[1].max(), log_density = lambda x: -np.sum(sqrt_precision.dot(np.atleast_2d(x).T)**2, 0) * 0.5 - np.log(_const), cov_prec=(covariance, precision))
[docs] @staticmethod def laplace(shape, scale): """ Standard Laplace noise multiplied by `scale` Parameters ---------- shape : tuple Shape of noise. scale : float Scale of noise. """ rv = laplace(scale=scale, loc=0.) density = lambda x: np.product(rv.pdf(x)) grad_negative_log_density = lambda x: np.sign(x) / scale sampler = lambda size: rv.rvs(size=shape + size) cdf = lambda x: laplace.cdf(x, loc=0., scale = scale) pdf = lambda x: laplace.pdf(x, loc=0., scale = scale) derivative_log_density = lambda x: -np.sign(x)/scale grad_negative_log_density = lambda x: np.sign(x) / scale sampler = lambda size: rv.rvs(size=shape + size) CGF = laplace_CGF(shape, scale) CGF_conjugate = laplace_CGF_conjugate(shape, scale) constant = -np.product(shape) * np.log(2 * scale) return randomization(shape, density, cdf, pdf, derivative_log_density, grad_negative_log_density, sampler, lipschitz=1./scale**2, log_density = lambda x: -np.fabs(np.atleast_2d(x)).sum(1) / scale - np.log(scale) + constant, CGF=CGF, CGF_conjugate=CGF_conjugate,)
[docs] @staticmethod def logistic(shape, scale): """ Standard logistic noise multiplied by `scale` Parameters ---------- shape : tuple Shape of noise. scale : float Scale of noise. """ # from http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.logistic.html density = lambda x: (np.product(np.exp(-x / scale) / (1 + np.exp(-x / scale))**2) / scale**(np.product(x.shape))) cdf = lambda x: logistic.cdf(x, loc=0., scale = scale) pdf = lambda x: logistic.pdf(x, loc=0., scale = scale) derivative_log_density = lambda x: (np.exp(-x/scale)-1)/(scale*np.exp(-x/scale)+1) # negative log density is (with \mu=0) # x/s + log(s) + 2 \log (1 + e(-x/s)) grad_negative_log_density = lambda x: (1 - np.exp(-x / scale)) / ((1 + np.exp(-x / scale)) * scale) sampler = lambda size: np.random.logistic(loc=0, scale=scale, size=shape + size) constant = - np.product(shape) * np.log(scale) return randomization(shape, density, cdf, pdf, derivative_log_density, grad_negative_log_density, sampler, lipschitz=.25/scale**2, log_density = lambda x: -np.atleast_2d(x).sum(1) / scale - 2 * np.log(1 + np.exp(-np.atleast_2d(x) / scale)).sum(1) + constant)
[docs]class split(randomization):
[docs] def __init__(self, shape, subsample_size, total_size): self.subsample_size = subsample_size self.total_size = total_size rr.smooth_atom.__init__(self, shape)
[docs] def get_covariance(self): if hasattr(self, "_covariance"): return self._covariance
[docs] def set_covariance(self, covariance): """ Once covariance has been set, then the usual API of randomization will work. """ self._covariance = covariance precision = np.linalg.inv(covariance) self._cov_prec = (covariance, precision) sqrt_precision = np.linalg.cholesky(precision).T _det = np.linalg.det(covariance) p = covariance.shape[0] _const = 1. # np.sqrt((2*np.pi)**p * _det) self._density = lambda x: np.exp(-(x * precision.dot(x)).sum() / 2) / _const self._grad_negative_log_density = lambda x: precision.dot(x) self._sampler = lambda size: sqrt_precision.dot(np.random.standard_normal((p,) + size)) self.lipschitz = np.linalg.svd(precision)[1].max() def _log_density(x): return -np.sum(sqrt_precision.dot(np.atleast_2d(x).T)**2, 0) * 0.5 - np.log(_const) self._log_density = _log_density
covariance = property(get_covariance, set_covariance) @property def cov_prec(self): if hasattr(self, "_covariance"): return self._cov_prec
[docs] def smooth_objective(self, perturbation, mode='both', check_feasibility=False): if not hasattr(self, "_covariance"): raise ValueError('first set the covariance') return randomization.smooth_objective(self, perturbation, mode=mode, check_feasibility=check_feasibility)
[docs] def sample(self, size=()): if not hasattr(self, "_covariance"): raise ValueError('first set the covariance') return randomization.sample(self, size=size)
[docs] def gradient(self, perturbation): if not hasattr(self, "_covariance"): raise ValueError('first set the covariance') return randomization.gradient(self, perturbation)
[docs] def randomize(self, loss, epsilon): """ Parameters ---------- loss : rr.glm A glm loss with a `subsample` method. epsilon : float Coefficient in front of quadratic term Returns ------- Subsampled loss multiplied by `n / m` where m is the subsample size out of a total sample size of n. The quadratic term is not multiplied by `n / m` """ n, m = self.total_size, self.subsample_size inv_frac = n / m quadratic = rr.identity_quadratic(epsilon, 0, 0, 0) m, n = self.subsample_size, self.total_size # shorthand idx = np.zeros(n, np.bool) idx[:m] = 1 np.random.shuffle(idx) randomized_loss = loss.subsample(idx) randomized_loss.coef *= inv_frac randomized_loss.quadratic = quadratic return randomized_loss, None
# Conjugate generating function for Gaussian
[docs]def isotropic_gaussian_CGF(shape, scale): # scale = SD return cumulant(shape, lambda x: (x**2).sum() * scale**2 / 2., lambda x: scale**2 * x)
[docs]def isotropic_gaussian_CGF_conjugate(shape, scale): # scale = SD return cumulant_conjugate(shape, lambda x: (x**2).sum() / (2 * scale**2), lambda x: x / scale**2)
# Conjugate generating function for Laplace def _standard_laplace_CGF_conjugate(u): """ sup_z uz + log(1 - z**2) """ _zeros = (u == 0) root = (-1 + np.sqrt(1 + u**2)) / (u + _zeros) value = (root * u + np.log(1 - root**2)).sum() return value def _standard_laplace_CGF_conjugate_grad(u): """ sup_z uz + log(1 - z**2) """ _zeros = (u == 0) root = (-1 + np.sqrt(1 + u**2)) / (u + _zeros) return root BIG = 10**10
[docs]def laplace_CGF(shape, scale): return cumulant(shape, lambda x: -np.log(1 - (scale * x)**2).sum() + BIG * (np.abs(x) > 1), lambda x: 2 * x * scale**2 / (1 - (scale * x)**2))
[docs]def laplace_CGF_conjugate(shape, scale): return cumulant_conjugate(shape, lambda x: _standard_laplace_CGF_conjugate(x / scale), lambda x: _standard_laplace_CGF_conjugate_grad(x / scale) / scale)
[docs]class from_grad_func(rr.smooth_atom): """ take a (func, grad) pair and make a smooth_objective """
[docs] def __init__(self, shape, func, grad, coef=1., offset=None, initial=None, quadratic=None): rr.smooth_atom.__init__(self, shape, offset=offset, quadratic=quadratic, initial=initial, coef=coef) self._func, self._grad = (func, grad)
[docs] def smooth_objective(self, param, mode='both', check_feasibility=False): """ Evaluate the smooth objective, computing its value, gradient or both. Parameters ---------- mean_param : ndarray The current parameter values. mode : str One of ['func', 'grad', 'both']. check_feasibility : bool If True, return `np.inf` when point is not feasible, i.e. when `mean_param` is not in the domain. Returns ------- If `mode` is 'func' returns just the objective value at `mean_param`, else if `mode` is 'grad' returns the gradient else returns both. """ param = self.apply_offset(param) if mode == 'func': return self.scale(self._func(param)) elif mode == 'grad': return self.scale(self._grad(param)) elif mode == 'both': return self.scale(self._func(param)), self.scale(self._grad(param)) else: raise ValueError("mode incorrectly specified")
[docs]class cumulant(from_grad_func): """ Class for CGF. """ pass
[docs]class cumulant_conjugate(from_grad_func): """ Class for conjugate of a CGF. """ pass