Source code for nipy.modalities.fmri.fmristat.model

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
This module defines the two default GLM passes of fmristat

The results of both passes of the GLM get pushed around by generators, which
know how to get out the (probably 3D) data for each slice, or parcel (for the
AR) case, estimate in 2D, then store the data back again in its original shape.

The containers here, in the execute methods, know how to reshape the data on the
way into the estimation (to 2D), then back again, to 3D, or 4D.

It's relatively easy to do this when just iterating over simple slices, but it
gets a bit more complicated when taking arbitrary shaped samples from the image,
as we do for estimating the AR coefficients, where we take all the voxels with
similar AR coefficients at once.
"""
from __future__ import absolute_import

import copy

import os.path as path

import numpy as np
import numpy.linalg as npl

from nipy.algorithms.statistics.models.regression import (
    OLSModel, ARModel, ar_bias_corrector, ar_bias_correct)
from nipy.algorithms.statistics.formula import make_recarray

# nipy core imports
from nipy.core.api import Image, parcels, matrix_generator, AffineTransform

# nipy IO imports
from nipy.io.api import save_image

# fmri imports
from ..api import FmriImageList, axis0_generator

from . import outputters


[docs]class ModelOutputImage(object): """ These images have their values filled in as the model is fit, and are saved to disk after being completely filled in. They are saved to disk by calling the 'save' method. The __getitem__ and __setitem__ calls are delegated to a private Image. An exception is raised if trying to get/set data after the data has been saved to disk. """
[docs] def __init__(self, filename, coordmap, shape, clobber=False): self.filename = filename self._im_data = np.zeros(shape) self._im = Image(self._im_data, coordmap) # Using a dangerous undocumented API here self.clobber = clobber self._flushed = False
[docs] def save(self): """ Save current Image data to disk """ if not self.clobber and path.exists(self.filename): raise ValueError('trying to clobber existing file') save_image(self._im, self.filename) self._flushed = True del(self._im)
def __getitem__(self, item): if self._flushed: raise ValueError('trying to read value from a ' 'saved ModelOutputImage') return self._im_data[item] def __setitem__(self, item, value): if self._flushed: raise ValueError('trying to set value on saved' 'ModelOutputImage') self._im_data[item] = value
[docs]def model_generator(formula, data, volume_start_times, iterable=None, slicetimes=None, model_type=OLSModel, model_params = lambda x: ()): """ Generator for the models for a pass of fmristat analysis. """ volume_start_times = make_recarray(volume_start_times.astype(float), 't') # Generator for slices of the data with time as first axis axis0_gen = axis0_generator(data, slicers=iterable) # Iterate over 2D slices of the data for indexer, indexed_data in matrix_generator(axis0_gen): model_args = model_params(indexer) # model may depend on i # Get the design for these volume start times design = formula.design(volume_start_times, return_float=True) # Make the model from the design rmodel = model_type(design, *model_args) yield indexer, indexed_data, rmodel
[docs]def results_generator(model_iterable): """ Generator for results from an iterator that returns (index, data, model) tuples. See model_generator. """ for i, d, m in model_iterable: yield i, m.fit(d)
[docs]class OLS(object): """ First pass through fmri_image. Parameters ---------- fmri_image : `FmriImageList` or 4D image object returning 4D data from np.asarray, with first (``object[0]``) axis being the independent variable of the model; object[0] returns an object with attribute ``shape``. formula : :class:`nipy.algorithms.statistics.formula.Formula` outputs : volume_start_times : """
[docs] def __init__(self, fmri_image, formula, outputs=[], volume_start_times=None): self.fmri_image = fmri_image try: self.data = fmri_image.get_data() except AttributeError: self.data = fmri_image.get_list_data(axis=0) self.formula = formula self.outputs = outputs if volume_start_times is None: self.volume_start_times = self.fmri_image.volume_start_times else: self.volume_start_times = volume_start_times
[docs] def execute(self): m = model_generator(self.formula, self.data, self.volume_start_times, model_type=OLSModel) r = results_generator(m) def reshape(i, x): if len(x.shape) == 2: if type(i) is type(1): x.shape = (x.shape[0],) + self.fmri_image[0].shape[1:] if type(i) not in [type([]), type(())]: i = (i,) else: i = tuple(i) i = (slice(None,None,None),) + tuple(i) else: if type(i) is type(1): x.shape = self.fmri_image[0].shape[1:] return i, x o = generate_output(self.outputs, r, reshape=reshape)
[docs]def estimateAR(resid, design, order=1): """ Estimate AR parameters using bias correction from fMRIstat. Parameters ---------- resid: array-like residuals from model model: an OLS model used to estimate residuals Returns ------- output : array shape (order, resid """ invM = ar_bias_corrector(design, npl.pinv(design), order) return ar_bias_correct(resid, order, invM)
[docs]class AR1(object): """ Second pass through fmri_image. Parameters ---------- fmri_image : `FmriImageList` object returning 4D array from ``np.asarray``, having attribute ``volume_start_times`` (if `volume_start_times` is None), and such that ``object[0]`` returns something with attributes ``shape`` formula : :class:`nipy.algorithms.statistics.formula.Formula` rho : ``Image`` image of AR(1) coefficients. Returning data from ``rho.get_data()``, and having attribute ``coordmap`` outputs : volume_start_times : """
[docs] def __init__(self, fmri_image, formula, rho, outputs=[], volume_start_times=None): self.fmri_image = fmri_image try: self.data = fmri_image.get_data() except AttributeError: self.data = fmri_image.get_list_data(axis=0) self.formula = formula self.outputs = outputs # Cleanup rho values, truncate them to a scale of 0.01 g = copy.copy(rho.coordmap) rho = rho.get_data() m = np.isnan(rho) r = (np.clip(rho,-1,1) * 100).astype(np.int) / 100. r[m] = np.inf self.rho = Image(r, g) if volume_start_times is None: self.volume_start_times = self.fmri_image.volume_start_times else: self.volume_start_times = volume_start_times
[docs] def execute(self): iterable = parcels(self.rho, exclude=[np.inf]) def model_params(i): return (self.rho.get_data()[i].mean(),) # Generates indexer, data, model m = model_generator(self.formula, self.data, self.volume_start_times, iterable=iterable, model_type=ARModel, model_params=model_params) # Generates indexer, data, 2D results r = results_generator(m) def reshape(i, x): """ To write output, arrays have to be reshaped -- this function does the appropriate reshaping for the two passes of fMRIstat. These passes are: i) 'slices through the z-axis' ii) 'parcels of approximately constant AR1 coefficient' """ if len(x.shape) == 2: # 2D imput matrix if type(i) is type(1): # integer indexing # reshape to ND (where N is probably 4) x.shape = (x.shape[0],) + self.fmri_image[0].shape[1:] # Convert lists to tuples, put anything else into a tuple if type(i) not in [type([]), type(())]: i = (i,) else: i = tuple(i) # Add : to indexing i = (slice(None,None,None),) + tuple(i) else: # not 2D if type(i) is type(1): # integer indexing x.shape = self.fmri_image[0].shape[1:] return i, x # Put results pulled from results generator r, into outputs o = generate_output(self.outputs, r, reshape=reshape)
[docs]def output_T(outbase, contrast, fmri_image, effect=True, sd=True, t=True, clobber=False): """ Return t contrast regression outputs list for `contrast` Parameters ---------- outbase : string Base filename that will be used to construct a set of files for the TContrast. For example, outbase='output.nii' will result in the following files (assuming defaults for all other params): output_effect.nii, output_sd.nii, output_t.nii contrast : array F contrast matrix fmri_image : ``FmriImageList`` or ``Image`` object such that ``object[0]`` has attributes ``shape`` and ``coordmap`` effect : {True, False}, optional whether to write an effect image sd : {True, False}, optional whether to write a standard deviation image t : {True, False}, optional whether to write a t image clobber : {False, True}, optional whether to overwrite images that exist. Returns ------- reglist : ``RegressionOutputList`` instance Regression output list with selected outputs, where selection is by inputs `effect`, `sd` and `t` Notes ----- Note that this routine uses the corresponding ``output_T`` routine in :mod:`outputters`, but indirectly via the TOutput object. """ def build_filename(label): index = outbase.find('.') return ''.join([outbase[:index], '_', label, outbase[index:]]) if effect: effectim = ModelOutputImage(build_filename('effect'), fmri_image[0].coordmap, fmri_image[0].shape, clobber=clobber) else: effectim = None if sd: sdim = ModelOutputImage(build_filename('sd'), fmri_image[0].coordmap, fmri_image[0].shape, clobber=clobber) else: sdim = None if t: tim = ModelOutputImage(build_filename('t'), fmri_image[0].coordmap,fmri_image[0].shape, clobber=clobber) else: tim = None return outputters.TOutput(contrast, effect=effectim, sd=sdim, t=tim)
[docs]def output_F(outfile, contrast, fmri_image, clobber=False): ''' output F statistic images Parameters ---------- outfile : str filename for F contrast image contrast : array F contrast matrix fmri_image : ``FmriImageList`` or ``Image`` object such that ``object[0]`` has attributes ``shape`` and ``coordmap`` clobber : bool if True, overwrites previous output; if False, raises error Returns ------- f_reg_out : ``RegressionOutput`` instance Object that can a) be called with a results instance as argument, returning an array, and b) accept the output array for storing, via ``obj[slice_spec] = arr`` type slicing. ''' f = ModelOutputImage(outfile, fmri_image[0].coordmap, fmri_image[0].shape, clobber=clobber) return outputters.RegressionOutput(f, lambda x: outputters.output_F(x, contrast))
[docs]def output_AR1(outfile, fmri_image, clobber=False): """ Create an output file of the AR1 parameter from the OLS pass of fmristat. Parameters ---------- outfile : fmri_image : ``FmriImageList`` or 4D image object such that ``object[0]`` has attributes ``coordmap`` and ``shape`` clobber : bool if True, overwrite previous output Returns ------- regression_output : ``RegressionOutput`` instance """ outim = ModelOutputImage(outfile, fmri_image[0].coordmap, fmri_image[0].shape, clobber=clobber) return outputters.RegressionOutput(outim, outputters.output_AR1)
[docs]def output_resid(outfile, fmri_image, clobber=False): """ Create an output file of the residuals parameter from the OLS pass of fmristat. Uses affine part of the first image to output resids unless fmri_image is an Image. Parameters ---------- outfile : fmri_image : ``FmriImageList`` or 4D image If ``FmriImageList``, needs attributes ``volume_start_times``, supports len(), and object[0] has attributes ``affine``, ``coordmap`` and ``shape``, from which we create a new 4D coordmap and shape If 4D image, use the images coordmap and shape clobber : bool if True, overwrite previous output Returns ------- regression_output : """ if isinstance(fmri_image, FmriImageList): n = len(fmri_image.list) T = np.zeros((5,5)) g = fmri_image[0].coordmap T[1:,1:] = fmri_image[0].affine T[0,0] = (fmri_image.volume_start_times[1:] - fmri_image.volume_start_times[:-1]).mean() # FIXME: NIFTI specific naming here innames = ["t"] + list(g.function_domain.coord_names) outnames = ["t"] + list(g.function_range.coord_names) cmap = AffineTransform.from_params(innames, outnames, T) shape = (n,) + fmri_image[0].shape elif isinstance(fmri_image, Image): cmap = fmri_image.coordmap shape = fmri_image.shape else: raise ValueError("expecting FmriImageList or 4d Image") outim = ModelOutputImage(outfile, cmap, shape, clobber=clobber) return outputters.RegressionOutput(outim, outputters.output_resid)
[docs]def generate_output(outputs, iterable, reshape=lambda x, y: (x, y)): """ Write out results of a given output. In the regression setting, results is generally going to be a scipy.stats.models.model.LikelihoodModelResults instance. Parameters ---------- outputs : sequence sequence of output objects iterable : object Object which iterates, returning tuples of (indexer, results), where ``indexer`` can be used to index into the `outputs` reshape : callable accepts two arguments, first is the indexer, and the second is the array which will be indexed; returns modified indexer and array ready for slicing with modified indexer. """ for indexer, results in iterable: for output in outputs: # Might be regression output object if not hasattr(output, "list"): # lame test here k, d = reshape(indexer, output(results)) output[k] = d else: # or a regression output list (like a TOutput, with several # images to output to) r = output(results) for j, l in enumerate(output.list): k, d = reshape(indexer, r[j]) l[k] = d # flush outputs, if necessary for output in outputs: if isinstance(output, outputters.RegressionOutput): if hasattr(output.img, 'save'): output.img.save() elif isinstance(output, outputters.RegressionOutputList): for im in output.list: if hasattr(im, 'save'): im.save()