# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Functions for computing reproducibility measures.
General procedure is:
- dataset is subject to jacknife subampling ('splitting'),
- each subsample being analysed independently,
- a reproducibility measure is then derived;
It is used to produce the work described in Analysis of a large fMRI
cohort:
Statistical and methodological issues for group analyses.
Thirion B, Pinel P, Meriaux S, Roche A, Dehaene S, Poline JB.
Neuroimage. 2007 Mar;35(1):105-20.
Bertrand Thirion, 2009-2010
"""
from __future__ import absolute_import
import numpy as np
from nipy.io.nibcompat import get_affine
from nipy.labs.spatial_models.discrete_domain import \
grid_domain_from_binary_array
# ---------------------------------------------------------
# ----- cluster handling functions ------------------------
# ---------------------------------------------------------
[docs]def histo_repro(h):
""" Given the histogram h, compute a standardized reproducibility measure
Parameters
----------
h array of shape(xmax+1), the histogram values
Returns
-------
hr, float: the measure
"""
k = np.size(h) - 1
if k == 1:
return 0.
nf = np.dot(h, np.arange(k + 1)) / k
if nf == 0:
return 0.
n1k = np.arange(1, k + 1)
res = 1.0 * np.dot(h[1:], n1k * (n1k - 1)) / (k * (k - 1))
return res / nf
[docs]def cluster_threshold(stat_map, domain, th, csize):
"""Perform a thresholding of a map at the cluster-level
Parameters
----------
stat_map: array of shape(nbvox)
the input data
domain: Nifti1Image instance,
referential- and domain-defining image
th (float): cluster-forming threshold
csize (int>0): cluster size threshold
Returns
-------
binary array of shape (nvox): the binarized thresholded map
Notes
-----
Should be replaced by a more standard function in the future
"""
if stat_map.shape[0] != domain.size:
raise ValueError('incompatible dimensions')
# first build a domain of supra_threshold regions
thresholded_domain = domain.mask(stat_map > th)
# get the connected components
label = thresholded_domain.connected_components()
binary = - np.ones(domain.size)
binary[stat_map > th] = label
nbcc = len(np.unique(label))
for i in range(nbcc):
if np.sum(label == i) < csize:
binary[binary == i] = - 1
binary = (binary > -1)
return binary
[docs]def get_cluster_position_from_thresholded_map(stat_map, domain, thr=3.0,
csize=10):
"""
the clusters above thr of size greater than csize in
18-connectivity are computed
Parameters
----------
stat_map : array of shape (nbvox),
map to threshold
mask: Nifti1Image instance,
referential- and domain-defining image
thr: float, optional,
cluster-forming threshold
cisze=10: int
cluster size threshold
Returns
-------
positions array of shape(k,anat_dim):
the cluster positions in physical coordinates
where k= number of clusters
if no such cluster exists, None is returned
"""
# if no supra-threshold voxel, return
if (stat_map <= thr).all():
return None
# first build a domain of supra_threshold regions
thresholded_domain = domain.mask(stat_map > thr)
# get the connected components
label = thresholded_domain.connected_components()
# get the coordinates
coord = thresholded_domain.get_coord()
# get the barycenters
baryc = []
for i in range(label.max() + 1):
if np.sum(label == i) >= csize:
baryc.append(np.mean(coord[label == i], 0))
if len(baryc) == 0:
return None
baryc = np.vstack(baryc)
return baryc
[docs]def get_peak_position_from_thresholded_map(stat_map, domain, threshold):
"""The peaks above thr in 18-connectivity are computed
Parameters
----------
stat_map: array of shape (nbvox): map to threshold
deomain: referential- and domain-defining image
thr, float: cluster-forming threshold
Returns
-------
positions array of shape(k,anat_dim):
the cluster positions in physical coordinates
where k= number of clusters
if no such cluster exists, None is returned
"""
from ..statistical_mapping import get_3d_peaks
# create an image to represent stat_map
simage = domain.to_image(data=stat_map)
# extract the peaks
peaks = get_3d_peaks(simage, threshold=threshold, order_th=2)
if peaks is None:
return None
pos = np.array([p['pos'] for p in peaks])
return pos
# ---------------------------------------------------------
# ----- data splitting functions ------------------------
# ---------------------------------------------------------
[docs]def bootstrap_group(nsubj, ngroups):
"""Split the proposed group into redundant subgroups by bootstrap
Parameters
----------
nsubj (int) the number of subjects in the population
ngroups(int) Number of subbgroups to be drawn
Returns
-------
samples: a list of ngroups arrays containing
the indexes of the subjects in each subgroup
"""
groupsize = nsubj
samples = [(groupsize * np.random.rand(groupsize)).astype(np.int)
for i in range(ngroups)]
return samples
[docs]def split_group(nsubj, ngroups):
"""Split the proposed group into random disjoint subgroups
Parameters
----------
nsubj (int) the number of subjects to be split
ngroups(int) Number of subbgroups to be drawn
Returns
-------
samples: a list of ngroups arrays containing
the indexes of the subjects in each subgroup
"""
groupsize = int(np.floor(nsubj / ngroups))
rperm = np.argsort(np.random.rand(nsubj))
samples = [rperm[i * groupsize: (i + 1) * groupsize]
for i in range(ngroups)]
return samples
# ---------------------------------------------------------
# ----- statistic computation -----------------------------
# ---------------------------------------------------------
[docs]def conjunction(x, vx, k):
"""Returns a conjunction statistic as the sum of the k lowest t-values
Parameters
----------
x: array of shape(nrows, ncols),
effect matrix
vx: array of shape(nrows, ncols),
variance matrix
k: int,
number of subjects in the conjunction
Returns
-------
t array of shape(nrows): conjunction statistic
"""
t = np.sort(x / np.sqrt(np.maximum(vx, 1.e-15)))
cjt = np.sum(t[:, :k], 1)
return cjt
[docs]def ttest(x):
"""Returns the t-test for each row of the data x
"""
from ..group.onesample import stat
t = stat(x.T, id='student', axis=0)
return np.squeeze(t)
[docs]def fttest(x, vx):
"""Assuming that x and vx represent a effect and variance estimates,
returns a cumulated ('fixed effects') t-test of the data over each row
Parameters
----------
x: array of shape(nrows, ncols): effect matrix
vx: array of shape(nrows, ncols): variance matrix
Returns
-------
t array of shape(nrows): fixed effect statistics array
"""
if np.shape(x) != np.shape(vx):
raise ValueError("incompatible dimensions for x and vx")
n = x.shape[1]
t = x / np.sqrt(np.maximum(vx, 1.e-15))
t = t.mean(1) * np.sqrt(n)
return t
[docs]def mfx_ttest(x, vx):
"""Idem fttest, but returns a mixed-effects statistic
Parameters
----------
x: array of shape(nrows, ncols): effect matrix
vx: array of shape(nrows, ncols): variance matrix
Returns
-------
t array of shape(nrows): mixed effect statistics array
"""
from ..group.onesample import stat_mfx
t = stat_mfx(x.T, vx.T, id='student_mfx', axis=0)
return np.squeeze(t)
[docs]def voxel_thresholded_ttest(x, threshold):
"""Returns a binary map of the ttest>threshold
"""
t = ttest(x)
return t > threshold
[docs]def statistics_from_position(target, data, sigma=1.0):
""" Return a number characterizing how close data is from
target using a kernel-based statistic
Parameters
----------
target: array of shape(nt,anat_dim) or None
the target positions
data: array of shape(nd,anat_dim) or None
the data position
sigma=1.0 (float), kernel parameter
or a distance that say how good good is
Returns
-------
sensitivity (float): how well the targets are fitted
by the data in [0,1] interval
1 is good
0 is bad
"""
from ...algorithms.utils.fast_distance import euclidean_distance as ed
if data is None:
if target is None:
return 0.# could be 1.0 ?
else:
return 0.
if target is None:
return 0.
dmatrix = ed(data, target) / sigma
sensitivity = dmatrix.min(0)
sensitivity = np.exp( - 0.5 * sensitivity ** 2)
sensitivity = np.mean(sensitivity)
return sensitivity
# -------------------------------------------------------
# ---------- The main functions -----------------------------
# -------------------------------------------------------
[docs]def voxel_reproducibility(data, vardata, domain, ngroups, method='crfx',
swap=False, verbose=0, **kwargs):
""" return a measure of voxel-level reproducibility of activation patterns
Parameters
----------
data: array of shape (nvox,nsubj)
the input data from which everything is computed
vardata: array of shape (nvox,nsubj)
the corresponding variance information
ngroups (int):
Number of subbgroups to be drawn
domain: referential- and domain-defining image
ngourps: int,
number of groups to be used in the resampling procedure
method: string, to be chosen among 'crfx', 'cmfx', 'cffx'
inference method under study
verbose: bool, verbosity mode
Returns
-------
kappa (float): the desired reproducibility index
"""
rmap = map_reproducibility(data, vardata, domain, ngroups, method,
swap, verbose, **kwargs)
h = np.array([np.sum(rmap == i) for i in range(ngroups + 1)])
hr = histo_repro(h)
return hr
[docs]def draw_samples(nsubj, ngroups, split_method='default'):
""" Draw randomly ngroups sets of samples from [0..nsubj-1]
Parameters
----------
nsubj, int, the total number of items
ngroups, int, the number of desired groups
split_method: string, optional,
to be chosen among 'default', 'bootstrap', 'jacknife'
if 'bootstrap', then each group will be nsubj
drawn with repetitions among nsubj
if 'jacknife' the population is divided into
ngroups disjoint equally-sized subgroups
if 'default', 'bootstrap' is used when nsubj < 10 * ngroups
otherwise jacknife is used
Returns
-------
samples, a list of ngroups array that represent the subsets.
fixme : this should allow variable bootstrap,
i.e. draw ngroups of groupsize among nsubj
"""
if split_method == 'default':
if nsubj > 10 * ngroups:
samples = split_group(nsubj, ngroups)
else:
samples = bootstrap_group(nsubj, ngroups)
elif split_method == 'bootstrap':
samples = bootstrap_group(nsubj, ngroups)
elif split_method == '':
samples = split_group(nsubj, ngroups)
else:
raise ValueError('unknown splitting method')
return samples
[docs]def map_reproducibility(data, vardata, domain, ngroups, method='crfx',
swap=False, verbose=0, **kwargs):
""" Return a reproducibility map for the given method
Parameters
----------
data: array of shape (nvox,nsubj)
the input data from which everything is computed
vardata: array of the same size
the corresponding variance information
domain: referential- and domain-defining image
ngroups (int): the size of each subrgoup to be studied
threshold (float): binarization threshold
(makes sense only if method==rfx)
method='crfx', string to be chosen among 'crfx', 'cmfx', 'cffx'
inference method under study
verbose=0 : verbosity mode
Returns
-------
rmap: array of shape(nvox)
the reproducibility map
"""
nsubj = data.shape[1]
nvox = data.shape[0]
samples = draw_samples(nsubj, ngroups)
rmap = np.zeros(nvox)
for i in range(ngroups):
x = data[:, samples[i]]
if swap:
# randomly swap the sign of x
x *= (2 * (np.random.rand(len(samples[i])) > 0.5) - 1)
if method is not 'crfx':
vx = vardata[:, samples[i]]
csize = kwargs['csize']
threshold = kwargs['threshold']
# compute the statistical maps according to the method you like
if method == 'crfx':
stat_map = ttest(x)
elif method == 'cffx':
stat_map = fttest(x, vx)
elif method == 'cmfx':
stat_map = mfx_ttest(x, vx)
elif method == 'cjt':
# if kwargs.has_key('k'):
if 'k' in kwargs:
k = kwargs['k']
else:
k = nsubj / 2
stat_map = conjunction(x, vx, k)
else:
raise ValueError('unknown method')
# add the binarized map to a reproducibility map
rmap += cluster_threshold(stat_map, domain, threshold, csize) > 0
return rmap
[docs]def peak_reproducibility(data, vardata, domain, ngroups, sigma, method='crfx',
swap=False, verbose=0, **kwargs):
""" Return a measure of cluster-level reproducibility
of activation patterns
(i.e. how far clusters are from each other)
Parameters
----------
data: array of shape (nvox,nsubj)
the input data from which everything is computed
vardata: array of shape (nvox,nsubj)
the variance of the data that is also available
domain: referential- and domain-defining image
ngroups (int),
Number of subbgroups to be drawn
sigma: float, parameter that encodes how far far is
threshold: float, binarization threshold
method: string to be chosen among 'crfx', 'cmfx' or 'cffx',
inference method under study
swap = False: if True, a random sign swap of the data is performed
This is used to simulate a null hypothesis on the data.
verbose=0 : verbosity mode
Returns
-------
score (float): the desired cluster-level reproducibility index
"""
tiny = 1.e-15
nsubj = data.shape[1]
samples = draw_samples(nsubj, ngroups)
all_pos = []
# compute the positions in the different subgroups
for i in range(ngroups):
x = data[:, samples[i]]
if swap:
# apply a random sign swap to x
x *= (2 * (np.random.rand(len(samples[i])) > 0.5) - 1)
if method is not 'crfx':
vx = vardata[:, samples[i]]
if method is not 'bsa':
threshold = kwargs['threshold']
if method == 'crfx':
stat_map = ttest(x)
elif method == 'cmfx':
stat_map = mfx_ttest(x, vx)
elif method == 'cffx':
stat_map = fttest(x, vx)
elif method == 'cjt':
if 'k' in kwargs:
k = kwargs['k']
else:
k = nsubj / 2
stat_map = conjunction(x, vx, k)
pos = get_peak_position_from_thresholded_map(
stat_map, domain, threshold)
all_pos.append(pos)
else:
# method='bsa' is a special case
tx = x / (tiny + np.sqrt(vx))
afname = kwargs['afname']
theta = kwargs['theta']
dmax = kwargs['dmax']
ths = kwargs['ths']
thq = kwargs['thq']
smin = kwargs['smin']
niter = kwargs['niter']
afname = afname + '_%02d_%04d.pic' % (niter, i)
pos = coord_bsa(domain, tx, theta, dmax, ths, thq, smin, afname)
all_pos.append(pos)
# derive a kernel-based goodness measure from the pairwise comparison
# of sets of positions
score = 0
for i in range(ngroups):
for j in range(i):
score += statistics_from_position(all_pos[i], all_pos[j], sigma)
score += statistics_from_position(all_pos[j], all_pos[i], sigma)
score /= (ngroups * (ngroups - 1))
return score
[docs]def cluster_reproducibility(data, vardata, domain, ngroups, sigma,
method='crfx', swap=False, verbose=0,
**kwargs):
"""Returns a measure of cluster-level reproducibility
of activation patterns
(i.e. how far clusters are from each other)
Parameters
----------
data: array of shape (nvox,nsubj)
the input data from which everything is computed
vardata: array of shape (nvox,nsubj)
the variance of the data that is also available
domain: referential- and domain- defining image instance
ngroups (int),
Number of subbgroups to be drawn
sigma (float): parameter that encodes how far far is
threshold (float):
binarization threshold
method='crfx', string to be chosen among 'crfx', 'cmfx' or 'cffx'
inference method under study
swap = False: if True, a random sign swap of the data is performed
This is used to simulate a null hypothesis on the data.
verbose=0 : verbosity mode
Returns
-------
score (float): the desired cluster-level reproducibility index
"""
tiny = 1.e-15
nsubj = data.shape[1]
samples = draw_samples(nsubj, ngroups)
all_pos = []
# compute the positions in the different subgroups
for i in range(ngroups):
x = data[:, samples[i]]
if swap:
# apply a random sign swap to x
x *= (2 * (np.random.rand(len(samples[i])) > 0.5) - 1)
if method is not 'crfx':
vx = vardata[:, samples[i]]
if method is not 'bsa':
csize = kwargs['csize']
threshold = kwargs['threshold']
if method == 'crfx':
stat_map = ttest(x)
elif method == 'cmfx':
stat_map = mfx_ttest(x, vx)
elif method == 'cffx':
stat_map = fttest(x, vx)
elif method == 'cjt':
if 'k' in kwargs:
k = kwargs['k']
else:
k = nsubj / 2
stat_map = conjunction(x, vx, k)
pos = get_cluster_position_from_thresholded_map(stat_map, domain,
threshold, csize)
all_pos.append(pos)
else:
# method='bsa' is a special case
tx = x / (tiny + np.sqrt(vx))
afname = kwargs['afname']
theta = kwargs['theta']
dmax = kwargs['dmax']
ths = kwargs['ths']
thq = kwargs['thq']
smin = kwargs['smin']
niter = kwargs['niter']
afname = afname + '_%02d_%04d.pic' % (niter, i)
pos = coord_bsa(domain, tx, theta, dmax, ths, thq, smin, afname)
all_pos.append(pos)
# derive a kernel-based goodness measure from the pairwise comparison
# of sets of positions
score = 0
for i in range(ngroups):
for j in range(i):
score += statistics_from_position(all_pos[i], all_pos[j], sigma)
score += statistics_from_position(all_pos[j], all_pos[i], sigma)
score /= (ngroups * (ngroups - 1))
return score
[docs]def group_reproducibility_metrics(
mask_images, contrast_images, variance_images, thresholds, ngroups,
method, cluster_threshold=10, number_of_samples=10, sigma=6.,
do_clusters=True, do_voxels=True, do_peaks=True, swap=False):
"""
Main function to perform reproducibility analysis, including nifti1 io
Parameters
----------
threshold: list or 1-d array,
the thresholds to be tested
Returns
-------
cluster_rep_results: dictionary,
results of cluster-level reproducibility analysis
voxel_rep_results: dictionary,
results of voxel-level reproducibility analysis
peak_rep_results: dictionary,
results of peak-level reproducibility analysis
"""
from nibabel import load
from ..mask import intersect_masks
if ((len(variance_images) == 0) & (method is not 'crfx')):
raise ValueError('Variance images are necessary')
nsubj = len(contrast_images)
# compute the group mask
affine = get_affine(load(mask_images[0]))
mask = intersect_masks(mask_images, threshold=0) > 0
domain = grid_domain_from_binary_array(mask, affine)
# read the data
group_con = []
group_var = []
for s in range(nsubj):
group_con.append(load(contrast_images[s]).get_data()[mask])
if len(variance_images) > 0:
group_var.append(load(variance_images[s]).get_data()[mask])
group_con = np.squeeze(np.array(group_con)).T
group_con[np.isnan(group_con)] = 0
if len(variance_images) > 0:
group_var = np.squeeze(np.array(group_var)).T
group_var[np.isnan(group_var)] = 0
group_var = np.maximum(group_var, 1.e-15)
# perform the analysis
voxel_rep_results = {}
cluster_rep_results = {}
peak_rep_results = {}
for ng in ngroups:
if do_voxels:
voxel_rep_results.update({ng: {}})
if do_clusters:
cluster_rep_results.update({ng: {}})
if do_peaks:
peak_rep_results.update({ng: {}})
for th in thresholds:
kappa = []
cls = []
pk = []
kwargs = {'threshold': th, 'csize': cluster_threshold}
for i in range(number_of_samples):
if do_voxels:
kappa.append(voxel_reproducibility(
group_con, group_var, domain, ng, method, swap,
**kwargs))
if do_clusters:
cls.append(cluster_reproducibility(
group_con, group_var, domain, ng, sigma, method,
swap, **kwargs))
if do_peaks:
pk.append(peak_reproducibility(
group_con, group_var, domain, ng, sigma, method,
swap, **kwargs))
if do_voxels:
voxel_rep_results[ng].update({th: np.array(kappa)})
if do_clusters:
cluster_rep_results[ng].update({th: np.array(cls)})
if do_peaks:
peak_rep_results[ng].update({th: np.array(cls)})
return voxel_rep_results, cluster_rep_results, peak_rep_results
# -------------------------------------------------------
# ---------- BSA stuff ----------------------------------
# -------------------------------------------------------
[docs]def coord_bsa(domain, betas, theta=3., dmax=5., ths=0, thq=0.5, smin=0,
afname=None):
""" main function for performing bsa on a dataset
where bsa = nipy.labs.spatial_models.bayesian_structural_analysis
Parameters
----------
domain: image instance,
referential- and domain-defining image
betas: array of shape (nbnodes, subjects),
the multi-subject statistical maps
theta: float, optional
first level threshold
dmax: float>0, optional
expected cluster std in the common space in units of coord
ths: int, >=0), optional
representatitivity threshold
thq: float, optional,
posterior significance threshold should be in [0,1]
smin: int, optional,
minimal size of the regions to validate them
afname: string, optional
path where intermediate results cam be pickled
Returns
-------
afcoord array of shape(number_of_regions,3):
coordinate of the found landmark regions
"""
from ..spatial_models.bayesian_structural_analysis import compute_BSA_quick
crmap, AF, BF, p = compute_BSA_quick(
domain, betas, dmax, thq, smin, ths, theta, verbose=0)
if AF is None:
return None
if afname is not None:
import pickle
pickle.dump(AF, afname)
afcoord = AF.discrete_to_roi_features('position')
return afcoord