Source code for nipy.algorithms.clustering.bgmm

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Bayesian Gaussian Mixture Model Classes:
contains the basic fields and methods of Bayesian GMMs
the high level functions are/should be binded in C

The base class BGMM relies on an implementation that perfoms Gibbs sampling

A derived class VBGMM uses Variational Bayes inference instead

A third class is introduces to take advnatge of the old C-bindings,
but it is limited to diagonal covariance models

Author : Bertrand Thirion, 2008-2011
"""
from __future__ import print_function
from __future__ import absolute_import

import numpy as np
import numpy.random as nr
from scipy.linalg import inv, cholesky, eigvalsh
from scipy.special import gammaln
import math

from .utils import kmeans
from .gmm import GMM

##################################################################
# ancillary functions ############################################
##################################################################


[docs]def detsh(H): """ Routine for the computation of determinants of symmetric positive matrices Parameters ---------- H array of shape(n,n) the input matrix, assumed symmmetric and positive Returns ------- dh: float, the determinant """ return np.prod(eigvalsh(H))
[docs]def dirichlet_eval(w, alpha): """ Evaluate the probability of a certain discrete draw w from the Dirichlet density with parameters alpha Parameters ---------- w: array of shape (n) alpha: array of shape (n) """ if np.shape(w) != np.shape(alpha): raise ValueError("incompatible dimensions") loge = np.sum((alpha-1) * np.log(w)) logb = np.sum(gammaln(alpha)) - gammaln(alpha.sum()) loge -= logb return np.exp(loge)
[docs]def generate_normals(m, P): """ Generate a Gaussian sample with mean m and precision P Parameters ---------- m array of shape n: the mean vector P array of shape (n,n): the precision matrix Returns ------- ng : array of shape(n): a draw from the gaussian density """ icp = inv(cholesky(P)) ng = nr.randn(m.shape[0]) ng = np.dot(ng, icp) ng += m return ng
[docs]def generate_Wishart(n, V): """ Generate a sample from Wishart density Parameters ---------- n: float, the number of degrees of freedom of the Wishart density V: array of shape (n,n) the scale matrix of the Wishart density Returns ------- W: array of shape (n,n) the draw from Wishart density """ icv = cholesky(V) p = V.shape[0] A = nr.randn(p, p) for i in range(p): A[i, i:] = 0 A[i, i] = np.sqrt(nr.chisquare(n - i)) R = np.dot(icv, A) W = np.dot(R, R.T) return W
[docs]def wishart_eval(n, V, W, dV=None, dW=None, piV=None): """Evaluation of the probability of W under Wishart(n,V) Parameters ---------- n: float, the number of degrees of freedom (dofs) V: array of shape (n,n) the scale matrix of the Wishart density W: array of shape (n,n) the sample to be evaluated dV: float, optional, determinant of V dW: float, optional, determinant of W piV: array of shape (n,n), optional inverse of V Returns ------- (float) the density """ # check that shape(V)==shape(W) p = V.shape[0] if dV is None: dV = detsh(V) if dW is None: dW = detsh(W) if piV is None: piV = inv(V) ldW = math.log(dW) * (n - p - 1) / 2 ltr = - np.trace(np.dot(piV, W)) / 2 la = (n * p * math.log(2) + math.log(dV) * n) / 2 lg = math.log(math.pi) * p * (p - 1) / 4 lg += gammaln(np.arange(n - p + 1, n + 1).astype(np.float) / 2).sum() lt = ldW + ltr - la - lg return math.exp(lt)
[docs]def normal_eval(mu, P, x, dP=None): """ Probability of x under normal(mu, inv(P)) Parameters ---------- mu: array of shape (n), the mean parameter P: array of shape (n, n), the precision matrix x: array of shape (n), the data to be evaluated Returns ------- (float) the density """ dim = P.shape[0] if dP is None: dP = detsh(P) w0 = math.log(dP) - dim * math.log(2 * math.pi) w0 /= 2 dx = mu - x q = np.dot(np.dot(P, dx), dx) w = w0 - q / 2 like = math.exp(w) return like
[docs]def generate_perm(k, nperm=100): """ returns an array of shape(nbperm, k) representing the permutations of k elements Parameters ---------- k, int the number of elements to be permuted nperm=100 the maximal number of permutations if gamma(k+1)>nperm: only nperm random draws are generated Returns ------- p: array of shape(nperm,k): each row is permutation of k """ from scipy.special import gamma if k == 1: return np.reshape(np.array([0]), (1, 1)).astype(np.int) if gamma(k + 1) < nperm: # exhaustive permutations aux = generate_perm(k - 1) n = aux.shape[0] perm = np.zeros((n * k, k)).astype(np.int) for i in range(k): perm[i * n:(i + 1) * n, :i] = aux[:, :i] perm[i * n:(i + 1) * n, i] = k-1 perm[i * n:(i + 1) * n, i + 1:] = aux[:, i:] else: from numpy.random import rand perm = np.zeros((nperm, k)).astype(np.int) for i in range(nperm): p = np.argsort(rand(k)) perm[i] = p return perm
[docs]def multinomial(probabilities): """ Generate samples form a miltivariate distribution Parameters ---------- probabilities: array of shape (nelements, nclasses): likelihood of each element belongin to each class each row is assumedt to sum to 1 One sample is draw from each row, resulting in Returns ------- z array of shape (nelements): the draws, that take values in [0..nclasses-1] """ nvox = probabilities.shape[0] nclasses = probabilities.shape[1] cuml = np.zeros((nvox, nclasses + 1)) cuml[:, 1:] = np.cumsum(probabilities, 1) aux = np.random.rand(nvox, 1) z = np.argmax(aux < cuml, 1)-1 return z
[docs]def dkl_gaussian(m1, P1, m2, P2): """ Returns the KL divergence between gausians densities Parameters ---------- m1: array of shape (n), the mean parameter of the first density P1: array of shape(n,n), the precision parameters of the first density m2: array of shape (n), the mean parameter of the second density P2: array of shape(n,n), the precision parameters of the second density """ tiny = 1.e-15 dim = np.size(m1) if m1.shape != m2.shape: raise ValueError("incompatible dimensions for m1 and m2") if P1.shape != P2.shape: raise ValueError("incompatible dimensions for P1 and P2") if P1.shape[0] != dim: raise ValueError("incompatible dimensions for m1 and P1") d1 = max(detsh(P1), tiny) d2 = max(detsh(P2), tiny) dkl = np.log(d1 / d2) + np.trace(np.dot(P2, inv(P1))) - dim dkl += np.dot(np.dot((m1 - m2).T, P2), (m1 - m2)) dkl /= 2 return dkl
[docs]def dkl_wishart(a1, B1, a2, B2): """ returns the KL divergence bteween two Wishart distribution of parameters (a1,B1) and (a2,B2), Parameters ---------- a1: Float, degrees of freedom of the first density B1: array of shape(n,n), scale matrix of the first density a2: Float, degrees of freedom of the second density B2: array of shape(n,n), scale matrix of the second density Returns ------- dkl: float, the Kullback-Leibler divergence """ from scipy.special import psi, gammaln tiny = 1.e-15 if B1.shape != B2.shape: raise ValueError("incompatible dimensions for B1 and B2") dim = B1.shape[0] d1 = max(detsh(B1), tiny) d2 = max(detsh(B2), tiny) lgc = dim * (dim - 1) * math.log(np.pi) / 4 lg1 = lgc lg2 = lgc lw1 = - math.log(d1) + dim * math.log(2) lw2 = - math.log(d2) + dim * math.log(2) for i in range(dim): lg1 += gammaln((a1 - i) / 2) lg2 += gammaln((a2 - i) / 2) lw1 += psi((a1 - i) / 2) lw2 += psi((a2 - i) / 2) lz1 = 0.5 * a1 * dim * math.log(2) - 0.5 * a1 * math.log(d1) + lg1 lz2 = 0.5 * a2 * dim * math.log(2) - 0.5 * a2 * math.log(d2) + lg2 dkl = (a1 - dim - 1) * lw1 - (a2 - dim - 1) * lw2 - a1 * dim dkl += a1 * np.trace(np.dot(B2, inv(B1))) dkl /= 2 dkl += (lz2 - lz1) return dkl
[docs]def dkl_dirichlet(w1, w2): """ Returns the KL divergence between two dirichlet distribution Parameters ---------- w1: array of shape(n), the parameters of the first dirichlet density w2: array of shape(n), the parameters of the second dirichlet density """ if w1.shape != w2.shape: raise ValueError("incompatible dimensions for w1 and w2") dkl = 0 from scipy.special import gammaln, psi dkl = np.sum(gammaln(w2)) - np.sum(gammaln(w1)) dkl += gammaln(np.sum(w1)) - gammaln(np.sum(w2)) dkl += np.sum((w1 - w2) * (psi(w1) - psi(np.sum(w1)))) return dkl
####################################################################### # main GMM class ##################################################### #######################################################################
[docs]class BGMM(GMM): """ This class implements Bayesian GMMs this class contains the follwing fields k: int, the number of components in the mixture dim: int, the dimension of the data means: array of shape (k, dim) all the means of the components precisions: array of shape (k, dim, dim) the precisions of the componenets weights: array of shape (k): weights of the mixture shrinkage: array of shape (k): scaling factor of the posterior precisions on the mean dof: array of shape (k) the degrees of freedom of the components prior_means: array of shape (k, dim): the prior on the components means prior_scale: array of shape (k, dim): the prior on the components precisions prior_dof: array of shape (k): the prior on the dof (should be at least equal to dim) prior_shrinkage: array of shape (k): scaling factor of the prior precisions on the mean prior_weights: array of shape (k) the prior on the components weights shrinkage: array of shape (k): scaling factor of the posterior precisions on the mean dof : array of shape (k): the posterior dofs fixme ----- only 'full' precision is supported """
[docs] def __init__(self, k=1, dim=1, means=None, precisions=None, weights=None, shrinkage=None, dof=None): """ Initialize the structure with the dimensions of the problem Eventually provide different terms """ GMM.__init__(self, k, dim, 'full', means, precisions, weights) self.shrinkage = shrinkage self.dof = dof if self.shrinkage is None: self.shrinkage = np.ones(self.k) if self.dof is None: self.dof = np.ones(self.k) if self.precisions is not None: self._detp = [detsh(self.precisions[k]) for k in range(self.k)]
[docs] def check(self): """ Checking the shape of sifferent matrices involved in the model """ GMM.check(self) if self.prior_means.shape[0] != self.k: raise ValueError("Incorrect dimension for self.prior_means") if self.prior_means.shape[1] != self.dim: raise ValueError("Incorrect dimension for self.prior_means") if self.prior_scale.shape[0] != self.k: raise ValueError("Incorrect dimension for self.prior_scale") if self.prior_scale.shape[1] != self.dim: raise ValueError("Incorrect dimension for self.prior_scale") if self.prior_dof.shape[0] != self.k: raise ValueError("Incorrect dimension for self.prior_dof") if self.prior_weights.shape[0] != self.k: raise ValueError("Incorrect dimension for self.prior_weights")
[docs] def set_priors(self, prior_means, prior_weights, prior_scale, prior_dof, prior_shrinkage): """ Set the prior of the BGMM Parameters ---------- prior_means: array of shape (self.k,self.dim) prior_weights: array of shape (self.k) prior_scale: array of shape (self.k,self.dim,self.dim) prior_dof: array of shape (self.k) prior_shrinkage: array of shape (self.k) """ self.prior_means = prior_means self.prior_weights = prior_weights self.prior_scale = prior_scale self.prior_dof = prior_dof self.prior_shrinkage = prior_shrinkage # cache some pre-computations self._dets = [detsh(self.prior_scale[k]) for k in range(self.k)] self._inv_prior_scale = np.array([inv(self.prior_scale[k]) for k in range(self.k)]) self.check()
[docs] def guess_priors(self, x, nocheck=0): """ Set the priors in order of having them weakly uninformative this is from Fraley and raftery; Journal of Classification 24:155-181 (2007) Parameters ---------- x, array of shape (nb_samples,self.dim) the data used in the estimation process nocheck: boolean, optional, if nocheck==True, check is skipped """ # a few parameters small = 0.01 elshape = (1, self.dim, self.dim) mx = np.reshape(x.mean(0), (1, self.dim)) dx = x - mx vx = np.dot(dx.T, dx) / x.shape[0] px = np.reshape(np.diag(1.0 / np.diag(vx)), elshape) px *= np.exp(2.0 / self.dim * math.log(self.k)) # set the priors self.prior_means = np.repeat(mx, self.k, 0) self.prior_weights = np.ones(self.k) self.prior_scale = np.repeat(px, self.k, 0) self.prior_dof = np.ones(self.k) * (self.dim + 2) self.prior_shrinkage = np.ones(self.k) * small # cache some pre-computations self._dets = np.ones(self.k) * detsh(px[0]) self._inv_prior_scale = np.repeat( np.reshape(inv(px[0]), elshape), self.k, 0) # check that everything is OK if nocheck == True: self.check()
[docs] def initialize(self, x): """ initialize z using a k-means algorithm, then upate the parameters Parameters ---------- x: array of shape (nb_samples,self.dim) the data used in the estimation process """ if self.k > 1: cent, z, J = kmeans(x, self.k) else: z = np.zeros(x.shape[0]).astype(np.int) self.update(x, z)
[docs] def pop(self, z): """ compute the population, i.e. the statistics of allocation Parameters ---------- z array of shape (nb_samples), type = np.int the allocation variable Returns ------- hist : array shape (self.k) count variable """ hist = np.array([np.sum(z == k) for k in range(self.k)]) return hist
[docs] def update_weights(self, z): """ Given the allocation vector z, resample the weights parameter Parameters ---------- z array of shape (nb_samples), type = np.int the allocation variable """ pop = self.pop(z) weights = pop + self.prior_weights self.weights = np.random.dirichlet(weights)
[docs] def update_means(self, x, z): """ Given the allocation vector z, and the corresponding data x, resample the mean Parameters ---------- x: array of shape (nb_samples,self.dim) the data used in the estimation process z: array of shape (nb_samples), type = np.int the corresponding classification """ pop = self.pop(z) self.shrinkage = self.prior_shrinkage + pop empmeans = np.zeros(np.shape(self.means)) prior_shrinkage = np.reshape(self.prior_shrinkage, (self.k, 1)) shrinkage = np.reshape(self.shrinkage, (self.k, 1)) for k in range(self.k): empmeans[k] = np.sum(x[z == k], 0) means = empmeans + self.prior_means * prior_shrinkage means /= shrinkage for k in range(self.k): self.means[k] = generate_normals(\ means[k], self.precisions[k] * self.shrinkage[k])
[docs] def update_precisions(self, x, z): """ Given the allocation vector z, and the corresponding data x, resample the precisions Parameters ---------- x array of shape (nb_samples,self.dim) the data used in the estimation process z array of shape (nb_samples), type = np.int the corresponding classification """ pop = self.pop(z) self.dof = self.prior_dof + pop + 1 rpop = pop + (pop == 0) self._detp = np.zeros(self.k) for k in range(self.k): # empirical means empmeans = np.sum(x[z == k], 0) / rpop[k] dm = np.reshape(empmeans - self.prior_means[k], (1, self.dim)) # scatter dx = np.reshape(x[z == k] - empmeans, (pop[k], self.dim)) scatter = np.dot(dx.T, dx) # bias addcov = np.dot(dm.T, dm) * self.prior_shrinkage[k] # covariance = prior term + scatter + bias covariance = self._inv_prior_scale[k] + scatter + addcov #precision scale = inv(covariance) self.precisions[k] = generate_Wishart(self.dof[k], scale) self._detp[k] = detsh(self.precisions[k])
[docs] def update(self, x, z): """ update function (draw a sample of the GMM parameters) Parameters ---------- x array of shape (nb_samples,self.dim) the data used in the estimation process z array of shape (nb_samples), type = np.int the corresponding classification """ self.update_weights(z) self.update_precisions(x, z) self.update_means(x, z)
[docs] def sample_indicator(self, like): """ sample the indicator from the likelihood Parameters ---------- like: array of shape (nb_samples,self.k) component-wise likelihood Returns ------- z: array of shape(nb_samples): a draw of the membership variable """ tiny = 1 + 1.e-15 like = (like.T / like.sum(1)).T like /= tiny z = multinomial(like) return z
[docs] def sample(self, x, niter=1, mem=0, verbose=0): """ sample the indicator and parameters Parameters ---------- x array of shape (nb_samples,self.dim) the data used in the estimation process niter=1 : the number of iterations to perform mem=0: if mem, the best values of the parameters are computed verbose=0: verbosity mode Returns ------- best_weights: array of shape (self.k) best_means: array of shape (self.k, self.dim) best_precisions: array of shape (self.k, self.dim, self.dim) possibleZ: array of shape (nb_samples, niter) the z that give the highest posterior to the data is returned first """ self.check_x(x) if mem: possibleZ = - np.ones((x.shape[0], niter)).astype(np.int) score = - np.inf bpz = - np.inf for i in range(niter): like = self.likelihood(x) sll = np.mean(np.log(np.sum(like, 1))) sll += np.log(self.probability_under_prior()) if sll > score: score = sll best_weights = self.weights.copy() best_means = self.means.copy() best_precisions = self.precisions.copy() z = self.sample_indicator(like) if mem: possibleZ[:, i] = z puz = sll # to save time self.update(x, z) if puz > bpz: ibz = i bpz = puz if mem: aux = possibleZ[:, 0].copy() possibleZ[:, 0] = possibleZ[:, ibz].copy() possibleZ[:, ibz] = aux return best_weights, best_means, best_precisions, possibleZ
[docs] def sample_and_average(self, x, niter=1, verbose=0): """ sample the indicator and parameters the average values for weights,means, precisions are returned Parameters ---------- x = array of shape (nb_samples,dim) the data from which bic is computed niter=1: number of iterations Returns ------- weights: array of shape (self.k) means: array of shape (self.k,self.dim) precisions: array of shape (self.k,self.dim,self.dim) or (self.k, self.dim) these are the average parameters across samplings Notes ----- All this makes sense only if no label switching as occurred so this is wrong in general (asymptotically). fix: implement a permutation procedure for components identification """ aprec = np.zeros(np.shape(self.precisions)) aweights = np.zeros(np.shape(self.weights)) ameans = np.zeros(np.shape(self.means)) for i in range(niter): like = self.likelihood(x) z = self.sample_indicator(like) self.update(x, z) aprec += self.precisions aweights += self.weights ameans += self.means aprec /= niter ameans /= niter aweights /= niter return aweights, ameans, aprec
[docs] def probability_under_prior(self): """ Compute the probability of the current parameters of self given the priors """ p0 = 1 p0 = dirichlet_eval(self.weights, self.prior_weights) for k in range(self.k): mp = np.reshape(self.precisions[k] * self.prior_shrinkage[k], (self.dim, self.dim)) p0 *= normal_eval(self.prior_means[k], mp, self.means[k]) p0 *= wishart_eval(self.prior_dof[k], self.prior_scale[k], self.precisions[k], dV=self._dets[k], dW=self._detp[k], piV=self._inv_prior_scale[k]) return p0
[docs] def conditional_posterior_proba(self, x, z, perm=None): """ Compute the probability of the current parameters of self given x and z Parameters ---------- x: array of shape (nb_samples, dim), the data from which bic is computed z: array of shape (nb_samples), type = np.int, the corresponding classification perm: array ok shape(nperm, self.k),typ=np.int, optional all permutation of z under which things will be recomputed By default, no permutation is performed """ pop = self.pop(z) rpop = (pop + (pop == 0)).astype(np.float) dof = self.prior_dof + pop + 1 shrinkage = self.prior_shrinkage + pop weights = pop + self.prior_weights # initialize the porsterior proba if perm is None: pp = dirichlet_eval(self.weights, weights) else: pp = np.array([dirichlet_eval(self.weights[pj], weights) for pj in perm]) for k in range(self.k): m1 = np.sum(x[z == k], 0) #0. Compute the empirical means empmeans = m1 / rpop[k] #1. the precisions dx = np.reshape(x[z == k] - empmeans, (pop[k], self.dim)) dm = np.reshape(empmeans - self.prior_means[k], (1, self.dim)) addcov = np.dot(dm.T, dm) * self.prior_shrinkage[k] covariance = self._inv_prior_scale[k] + np.dot(dx.T, dx) + addcov scale = inv(covariance) _dets = detsh(scale) #2. the means means = m1 + self.prior_means[k] * self.prior_shrinkage[k] means /= shrinkage[k] #4. update the posteriors if perm is None: pp *= wishart_eval( dof[k], scale, self.precisions[k], dV=_dets, dW=self._detp[k], piV=covariance) else: for j, pj in enumerate(perm): pp[j] *= wishart_eval( dof[k], scale, self.precisions[pj[k]], dV=_dets, dW=self._detp[pj[k]], piV=covariance) mp = scale * shrinkage[k] _dP = _dets * shrinkage[k] ** self.dim if perm is None: pp *= normal_eval(means, mp, self.means[k], dP=_dP) else: for j, pj in enumerate(perm): pp[j] *= normal_eval( means, mp, self.means[pj[k]], dP=_dP) return pp
[docs] def evidence(self, x, z, nperm=0, verbose=0): """ See bayes_factor(self, x, z, nperm=0, verbose=0) """ return self.bayes_factor(self, x, z, nperm, verbose)
[docs] def bayes_factor(self, x, z, nperm=0, verbose=0): """ Evaluate the Bayes Factor of the current model using Chib's method Parameters ---------- x: array of shape (nb_samples,dim) the data from which bic is computed z: array of shape (nb_samples), type = np.int the corresponding classification nperm=0: int the number of permutations to sample to model the label switching issue in the computation of the Bayes Factor By default, exhaustive permutations are used verbose=0: verbosity mode Returns ------- bf (float) the computed evidence (Bayes factor) Notes ----- See: Marginal Likelihood from the Gibbs Output Journal article by Siddhartha Chib; Journal of the American Statistical Association, Vol. 90, 1995 """ niter = z.shape[1] p = [] perm = generate_perm(self.k) if nperm > perm.shape[0]: nperm = perm.shape[0] for i in range(niter): if nperm == 0: temp = self.conditional_posterior_proba(x, z[:, i], perm) p.append(temp.mean()) else: drand = np.argsort(np.random.rand(perm.shape[0]))[:nperm] temp = self.conditional_posterior_proba(x, z[:, i], perm[drand]) p.append(temp.mean()) p = np.array(p) mp = np.mean(p) p0 = self.probability_under_prior() like = self.likelihood(x) bf = np.log(p0) + np.sum(np.log(np.sum(like, 1))) - np.log(mp) if verbose: print(np.log(p0), np.sum(np.log(np.sum(like, 1))), np.log(mp)) return bf
# --------------------------------------------------------- # --- Variational Bayes inference ------------------------- # ---------------------------------------------------------
[docs]class VBGMM(BGMM): """ Subclass of Bayesian GMMs (BGMM) that implements Variational Bayes estimation of the parameters """
[docs] def __init__(self, k=1, dim=1, means=None, precisions=None, weights=None, shrinkage=None, dof=None): BGMM.__init__(self, k, dim, means, precisions, weights, shrinkage, dof) self.scale = self.precisions.copy()
def _Estep(self, x): """VB-E step Parameters ---------- x array of shape (nb_samples,dim) the data used in the estimation process Returns ------- like: array of shape(nb_samples,self.k), component-wise likelihood """ n = x.shape[0] like = np.zeros((n, self.k)) from scipy.special import psi spsi = psi(np.sum(self.weights)) for k in range(self.k): # compute the data-independent factor first w0 = psi(self.weights[k]) - spsi w0 += 0.5 * np.log(detsh(self.scale[k])) w0 -= self.dim * 0.5 / self.shrinkage[k] w0 += 0.5 * np.log(2) * self.dim for i in range(self.dim): w0 += 0.5 * psi((self.dof[k] - i) / 2) m = np.reshape(self.means[k], (1, self.dim)) b = self.dof[k] * self.scale[k] q = np.sum(np.dot(m - x, b) * (m - x), 1) w = w0 - q / 2 w -= 0.5 * np.log(2 * np.pi) * self.dim like[:, k] = np.exp(w) if like.min() < 0: raise ValueError('Likelihood cannot be negative') return like
[docs] def evidence(self, x, like=None, verbose=0): """computation of evidence bound aka free energy Parameters ---------- x array of shape (nb_samples,dim) the data from which evidence is computed like=None: array of shape (nb_samples, self.k), optional component-wise likelihood If None, it is recomputed verbose=0: verbosity model Returns ------- ev (float) the computed evidence """ from scipy.special import psi from numpy.linalg import inv tiny = 1.e-15 if like is None: like = self._Estep(x) like = (like.T / np.maximum(like.sum(1), tiny)).T pop = like.sum(0)[:self.k] pop = np.reshape(pop, (self.k, 1)) spsi = psi(np.sum(self.weights)) empmeans = np.dot(like.T[:self.k], x) / np.maximum(pop, tiny) F = 0 # start with the average likelihood term for k in range(self.k): # compute the data-independent factor first Lav = psi(self.weights[k]) - spsi Lav -= np.sum(like[:, k] * np.log(np.maximum(like[:, k], tiny))) \ / pop[k] Lav -= 0.5 * self.dim * np.log(2 * np.pi) Lav += 0.5 * np.log(detsh(self.scale[k])) Lav += 0.5 * np.log(2) * self.dim for i in range(self.dim): Lav += 0.5 * psi((self.dof[k] - i) / 2) Lav -= self.dim * 0.5 / self.shrinkage[k] Lav *= pop[k] empcov = np.zeros((self.dim, self.dim)) dx = x - empmeans[k] empcov = np.dot(dx.T, like[:, k:k + 1] * dx) Lav -= 0.5 * np.trace(np.dot(empcov, self.scale[k] * self.dof[k])) F += Lav #then the KL divergences prior_covariance = np.array(self._inv_prior_scale) covariance = np.array([inv(self.scale[k]) for k in range(self.k)]) Dklw = 0 Dklg = 0 Dkld = dkl_dirichlet(self.weights, self.prior_weights) for k in range(self.k): Dklw += dkl_wishart(self.dof[k], covariance[k], self.prior_dof[k], prior_covariance[k]) nc = self.scale[k] * (self.dof[k] * self.shrinkage[k]) nc0 = self.scale[k] * (self.dof[k] * self.prior_shrinkage[k]) Dklg += dkl_gaussian(self.means[k], nc, self.prior_means[k], nc0) Dkl = Dkld + Dklg + Dklw if verbose: print('Lav', F, 'Dkl', Dkld, Dklg, Dklw) F -= Dkl return F
def _Mstep(self, x, like): """VB-M step Parameters ---------- x: array of shape(nb_samples, self.dim) the data from which the model is estimated like: array of shape(nb_samples, self.k) the likelihood of the data under each class """ from numpy.linalg import inv tiny = 1.e-15 pop = like.sum(0) # shrinkage, weights,dof self.weights = self.prior_weights + pop pop = pop[0:self.k] like = like[:, :self.k] self.shrinkage = self.prior_shrinkage + pop self.dof = self.prior_dof + pop #reshape pop = np.reshape(pop, (self.k, 1)) prior_shrinkage = np.reshape(self.prior_shrinkage, (self.k, 1)) shrinkage = np.reshape(self.shrinkage, (self.k, 1)) # means means = np.dot(like.T, x) + self.prior_means * prior_shrinkage self.means = means / shrinkage #precisions empmeans = np.dot(like.T, x) / np.maximum(pop, tiny) empcov = np.zeros(np.shape(self.prior_scale)) for k in range(self.k): dx = x - empmeans[k] empcov[k] = np.dot(dx.T, like[:, k:k + 1] * dx) covariance = np.array(self._inv_prior_scale) + empcov dx = np.reshape(empmeans - self.prior_means, (self.k, self.dim, 1)) addcov = np.array([np.dot(dx[k], dx[k].T) for k in range(self.k)]) apms = np.reshape(prior_shrinkage * pop / shrinkage, (self.k, 1, 1)) covariance += addcov * apms # update scale self.scale = np.array([inv(covariance[k]) for k in range(self.k)])
[docs] def initialize(self, x): """ initialize z using a k-means algorithm, then upate the parameters Parameters ---------- x: array of shape (nb_samples,self.dim) the data used in the estimation process """ n = x.shape[0] if self.k > 1: cent, z, J = kmeans(x, self.k) else: z = np.zeros(x.shape[0]).astype(np.int) l = np.zeros((n, self.k)) l[np.arange(n), z] = 1 self._Mstep(x, l)
[docs] def map_label(self, x, like=None): """ return the MAP labelling of x Parameters ---------- x array of shape (nb_samples,dim) the data under study like=None array of shape(nb_samples,self.k) component-wise likelihood if like==None, it is recomputed Returns ------- z: array of shape(nb_samples): the resulting MAP labelling of the rows of x """ if like is None: like = self.likelihood(x) z = np.argmax(like, 1) return z
[docs] def estimate(self, x, niter=100, delta=1.e-4, verbose=0): """estimation of self given x Parameters ---------- x array of shape (nb_samples,dim) the data from which the model is estimated z = None: array of shape (nb_samples) a prior labelling of the data to initialize the computation niter=100: maximal number of iterations in the estimation process delta = 1.e-4: increment of data likelihood at which convergence is declared verbose=0: verbosity mode """ # alternation of E/M step until convergence tiny = 1.e-15 av_ll_old = - np.inf for i in range(niter): like = self._Estep(x) av_ll = np.mean(np.log(np.maximum(np.sum(like, 1), tiny))) if av_ll < av_ll_old + delta: if verbose: print('iteration:', i, 'log-likelihood:', av_ll, 'old value:', av_ll_old) break else: av_ll_old = av_ll if verbose: print(i, av_ll, self.bic(like)) like = (like.T / np.maximum(like.sum(1), tiny)).T self._Mstep(x, like)
[docs] def likelihood(self, x): """ return the likelihood of the model for the data x the values are weighted by the components weights Parameters ---------- x: array of shape (nb_samples, self.dim) the data used in the estimation process Returns ------- like: array of shape(nb_samples, self.k) component-wise likelihood """ x = self.check_x(x) return self._Estep(x)
[docs] def pop(self, like, tiny=1.e-15): """ compute the population, i.e. the statistics of allocation Parameters ---------- like array of shape (nb_samples, self.k): the likelihood of each item being in each class """ slike = np.maximum(tiny, np.sum(like, 1)) nlike = (like.T / slike).T return np.sum(nlike, 0)