Source code for nipy.modalities.fmri.utils

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
""" This module defines some convenience functions of time.

interp : an expresion for a interpolated function of time

linear_interp : an expression for a linearly interpolated function of
   time

step_function : an expression for a step function of time

events : a convenience function to generate sums of events

blocks : a convenience function to generate sums of blocks

convolve_functions : numerically convolve two functions of time

fourier_basis : a convenience function to generate a Fourier basis
"""
from __future__ import print_function
from __future__ import absolute_import

import itertools

import numpy as np
from scipy.interpolate import interp1d

import sympy
from sympy import DiracDelta, Symbol
from sympy.utilities.lambdify import implemented_function, lambdify

from nipy.algorithms.statistics.formula.formulae import Term, Formula

# Legacy repr printing from numpy.
from nipy.testing import legacy_printing as setup_module  # noqa

T = Term('t')


[docs]class Interp1dNumeric(interp1d): """ Wrapper for interp1 to raise TypeError for object array input We need this because sympy will try to evaluate interpolated functions when constructing expressions involving floats. At least sympy 1.0 only accepts TypeError or AttributeError as indication that the implemented value cannot be sampled with the sympy expression. Therefore, raise a TypeError directly for an input giving an object array (such as a sympy expression), rather than letting interp1d raise a ValueError. See: * https://github.com/nipy/nipy/issues/395 * https://github.com/sympy/sympy/issues/10810 """ def __call__(self, x): if np.asarray(x).dtype.type == np.object_: raise TypeError('Object arrays not supported') return super(Interp1dNumeric, self).__call__(x)
[docs]def lambdify_t(expr): ''' Return sympy function of t `expr` lambdified as function of t Parameters ---------- expr : sympy expr Returns ------- func : callable Numerical implementation of function ''' return lambdify(T, expr, "numpy")
[docs]def define(name, expr): """ Create function of t expression from arbitrary expression `expr` Take an arbitrarily complicated expression `expr` of 't' and make it an expression that is a simple function of t, of form ``'%s(t)' % name`` such that when it evaluates (via ``lambdify``) it has the right values. Parameters ---------- expr : sympy expression with only 't' as a Symbol name : str Returns ------- nexpr: sympy expression Examples -------- >>> t = Term('t') >>> expr = t**2 + 3*t >>> print(expr) #doctest: +SYMPY_EQUAL 3*t + t**2 >>> newexpr = define('f', expr) >>> print(newexpr) f(t) >>> f = lambdify_t(newexpr) >>> f(4) 28 >>> 3*4+4**2 28 """ # make numerical implementation of expression v = lambdify(T, expr, "numpy") # convert numerical implementation to sympy function f = implemented_function(name, v) # Return expression that is function of time return f(T)
[docs]def fourier_basis(freq): """ sin and cos Formula for Fourier drift The Fourier basis consists of sine and cosine waves of given frequencies. Parameters ---------- freq : sequence of float Frequencies for the terms in the Fourier basis. Returns ------- f : Formula Examples -------- >>> f=fourier_basis([1,2,3]) >>> f.terms array([cos(2*pi*t), sin(2*pi*t), cos(4*pi*t), sin(4*pi*t), cos(6*pi*t), sin(6*pi*t)], dtype=object) >>> f.mean _b0*cos(2*pi*t) + _b1*sin(2*pi*t) + _b2*cos(4*pi*t) + _b3*sin(4*pi*t) + _b4*cos(6*pi*t) + _b5*sin(6*pi*t) """ r = [] for f in freq: r += [sympy.cos((2*sympy.pi*f*T)), sympy.sin((2*sympy.pi*f*T))] return Formula(r)
[docs]def interp(times, values, fill=0, name=None, **kw): """ Generic interpolation function of t given `times` and `values` Imterpolator such that: f(times[i]) = values[i] if t < times[0] or t > times[-1]: f(t) = fill See ``scipy.interpolate.interp1d`` for details of interpolation types and other keyword arguments. Default is 'kind' is linear, making this function, by default, have the same behavior as ``linear_interp``. Parameters ---------- times : array-like Increasing sequence of times values : array-like Values at the specified times fill : None or float, optional Value on the interval (-np.inf, times[0]). Default 0. If None, raises error outside bounds name : None or str, optional Name of symbolic expression to use. If None, a default is used. \*\*kw : keyword args, optional passed to ``interp1d`` Returns ------- f : sympy expression A Function of t. Examples -------- >>> s = interp([0,4,5.],[2.,4,6]) >>> tval = np.array([-0.1,0.1,3.9,4.1,5.1]) >>> res = lambdify_t(s)(tval) 0 outside bounds by default >>> np.allclose(res, [0, 2.05, 3.95, 4.2, 0]) True """ if fill is not None: if kw.get('bounds_error') is True: raise ValueError('fill conflicts with bounds error') fv = kw.get('fill_value') if not (fv is None or fv is fill or fv == fill): # allow for fill=np.nan raise ValueError('fill conflicts with fill_value') kw['bounds_error'] = False kw['fill_value'] = fill interpolator = Interp1dNumeric(times, values, **kw) # make a new name if none provided if name is None: name = 'interp%d' % interp.counter interp.counter += 1 s = implemented_function(name, interpolator) return s(T)
interp.counter = 0
[docs]def linear_interp(times, values, fill=0, name=None, **kw): """ Linear interpolation function of t given `times` and `values` Imterpolator such that: f(times[i]) = values[i] if t < times[0] or t > times[-1]: f(t) = fill This version of the function enforces the 'linear' kind of interpolation (argument to ``scipy.interpolate.interp1d``). Parameters ---------- times : array-like Increasing sequence of times values : array-like Values at the specified times fill : None or float, optional Value on the interval (-np.inf, times[0]). Default 0. If None, raises error outside bounds name : None or str, optional Name of symbolic expression to use. If None, a default is used. \*\*kw : keyword args, optional passed to ``interp1d`` Returns ------- f : sympy expression A Function of t. Examples -------- >>> s = linear_interp([0,4,5.],[2.,4,6]) >>> tval = np.array([-0.1,0.1,3.9,4.1,5.1]) >>> res = lambdify_t(s)(tval) 0 outside bounds by default >>> np.allclose(res, [0, 2.05, 3.95, 4.2, 0]) True """ kind = kw.get('kind') if kind is None: kw['kind'] = 'linear' elif kind != 'linear': raise ValueError('Only linear interpolation supported') return interp(times, values, fill, name, **kw)
[docs]def step_function(times, values, name=None, fill=0): """ Right-continuous step function of time t Function of t such that f(times[i]) = values[i] if t < times[0]: f(t) = fill Parameters ---------- times : (N,) sequence Increasing sequence of times values : (N,) sequence Values at the specified times fill : float Value on the interval (-np.inf, times[0]) name : str Name of symbolic expression to use. If None, a default is used. Returns ------- f_t : sympy expr Sympy expression f(t) where f is a sympy implemented anonymous function of time that implements the step function. To get the numerical version of the function, use ``lambdify_t(f_t)`` Examples -------- >>> s = step_function([0,4,5],[2,4,6]) >>> tval = np.array([-0.1,3.9,4.1,5.1]) >>> lam = lambdify_t(s) >>> lam(tval) array([ 0., 2., 4., 6.]) """ if name is None: name = 'step%d' % step_function.counter step_function.counter += 1 def _imp(x): x = np.asarray(x) f = np.zeros(x.shape) + fill for time, val in zip(times, values): f[x >= time] = val return f s = implemented_function(name, _imp) return s(T)
# Initialize counter for step function step_function.counter = 0
[docs]def events(times, amplitudes=None, f=DiracDelta, g=Symbol('a')): """ Return a sum of functions based on a sequence of times. Parameters ---------- times : sequence vector of onsets length $N$ amplitudes : None or sequence length $N$, optional Optional sequence of amplitudes. None (default) results in sequence length $N$ of 1s f : sympy.Function, optional Optional function. Defaults to DiracDelta, can be replaced with another function, f, in which case the result is the convolution with f. g : sympy.Basic, optional Optional sympy expression function of amplitudes. The amplitudes, should be represented by the symbol 'a', which will be substituted, by the corresponding value in `amplitudes`. Returns ------- sum_expression : Sympy.Add Sympy expression of time $t$, where onsets, as a function of $t$, have been symbolically convolved with function `f`, and any function `g` of corresponding amplitudes. Examples -------- We import some sympy stuff so we can test if we've got what we expected >>> from sympy import DiracDelta, Symbol, Function >>> from nipy.modalities.fmri.utils import T >>> evs = events([3,6,9]) >>> evs == DiracDelta(-9 + T) + DiracDelta(-6 + T) + DiracDelta(-3 + T) True >>> hrf = Function('hrf') >>> evs = events([3,6,9], f=hrf) >>> evs == hrf(-9 + T) + hrf(-6 + T) + hrf(-3 + T) True >>> evs = events([3,6,9], amplitudes=[2,1,-1]) >>> evs == -DiracDelta(-9 + T) + 2*DiracDelta(-3 + T) + DiracDelta(-6 + T) True """ e = 0 asymb = Symbol('a') if amplitudes is None: amplitudes = itertools.cycle([1]) for time, a in zip(times, amplitudes): e = e + g.subs(asymb, a) * f(T-time) return e
[docs]def blocks(intervals, amplitudes=None, name=None): """ Step function based on a sequence of intervals. Parameters ---------- intervals : (S,) sequence of (2,) sequences Sequence (S0, S1, ... S(N-1)) of sequences, where S0 (etc) are sequences of length 2, giving 'on' and 'off' times of block amplitudes : (S,) sequence of float, optional Optional amplitudes for each block. Defaults to 1. name : None or str, optional Name of the convolved function in the resulting expression. Defaults to one created by ``utils.interp``. Returns ------- b_of_t : sympy expr Sympy expression b(t) where b is a sympy anonymous function of time that implements the block step function Examples -------- >>> on_off = [[1,2],[3,4]] >>> tval = np.array([0.4,1.4,2.4,3.4]) >>> b = blocks(on_off) >>> lam = lambdify_t(b) >>> lam(tval) array([ 0., 1., 0., 1.]) >>> b = blocks(on_off, amplitudes=[3,5]) >>> lam = lambdify_t(b) >>> lam(tval) array([ 0., 3., 0., 5.]) """ t = [-np.inf] v = [0] if amplitudes is None: amplitudes = itertools.cycle([1]) for _t, a in zip(intervals, amplitudes): t += list(_t) v += [a, 0] t.append(np.inf) v.append(0) return step_function(t, v, name=name)
def _eval_for(f, interval, dt): """ Return x and y for function `f` over `interval` and delta `dt` """ real_f = lambdify_t(f) f_mn, f_mx = sorted(interval) time = np.arange(f_mn, f_mx, float(dt)) # time values with support for g vals = real_f(time).astype(float) return vals def _conv_fx_gx(f_vals, g_vals, dt, min_f, min_g): """ Numerical convolution given f(x), min(x) for two functions """ vals = np.convolve(f_vals, g_vals) * dt # Full by default # f and g have been implicitly translated by -f_mn and -g_mn respectively, # because in terms of array indices, they both now start at 0. # Translate by f and g offsets time = np.arange(len(vals)) * dt + min_f + min_g return time, vals
[docs]class TimeConvolver(object): """ Make a convolution kernel from a symbolic function of t A convolution kernel is a function with extra attributes to allow it to function as a kernel for numerical convolution (see :func:`convolve_functions`). Parameters ---------- expr : sympy expression An expression that is a function of t only. support : 2 sequence Sequence is ``(low, high)`` where expression is defined between ``low`` and ``high``, and can be assumed to be `fill` otherwise delta : float smallest change in domain of `expr` to use for numerical evaluation of `expr` """
[docs] def __init__(self, expr, support, delta, fill=0): self.expr = expr self.support = support self.delta = delta self.fill = fill self._vals = _eval_for(expr, self.support, self.delta)
[docs] def convolve(self, g, g_interval, name=None, **kwargs): """ Convolve sympy expression `g` with this kernel Parameters ---------- g : sympy expr An expression that is a function of t only. g_interval : (2,) sequence of floats Start and end of the interval of t over which to convolve g name : None or str, optional Name of the convolved function in the resulting expression. Defaults to one created by ``utils.interp``. \*\*kwargs : keyword args, optional Any other arguments to pass to the ``interp1d`` function in creating the numerical function for `fg`. Returns ------- fg : sympy expr An symbolic expression that is a function of t only, and that can be lambdified to produce a function returning the convolved series from an input array. """ g_vals = _eval_for(g, g_interval, self.delta) fg_time, fg_vals = _conv_fx_gx(self._vals, g_vals, self.delta, min(self.support), min(g_interval)) return interp(fg_time, fg_vals, fill=self.fill, name=name, **kwargs)
[docs]def convolve_functions(f, g, f_interval, g_interval, dt, fill=0, name=None, **kwargs): """ Expression containing numerical convolution of `fn1` with `fn2` Parameters ---------- f : sympy expr An expression that is a function of t only. g : sympy expr An expression that is a function of t only. f_interval : (2,) sequence of float The start and end of the interval of t over which to convolve values of f g_interval : (2,) sequence of floats Start and end of the interval of t over which to convolve g dt : float Time step for discretization. We use this for creating the interpolator to form the numerical implementation fill : None or float Value to return from sampling output `fg` function outside range. name : None or str, optional Name of the convolved function in the resulting expression. Defaults to one created by ``utils.interp``. \*\*kwargs : keyword args, optional Any other arguments to pass to the ``interp1d`` function in creating the numerical function for `fg`. Returns ------- fg : sympy expr An symbolic expression that is a function of t only, and that can be lambdified to produce a function returning the convolved series from an input array. Examples -------- >>> from nipy.algorithms.statistics.formula.formulae import Term >>> t = Term('t') This is a square wave on [0,1] >>> f1 = (t > 0) * (t < 1) The convolution of ``f1`` with itself is a triangular wave on [0, 2], peaking at 1 with height 1 >>> tri = convolve_functions(f1, f1, [0, 2], [0, 2], 1.0e-3, name='conv') The result is a symbolic function >>> print(tri) conv(t) Get the numerical values for a time vector >>> ftri = lambdify(t, tri) >>> x = np.arange(0, 2, 0.2) >>> y = ftri(x) The peak is at 1 >>> x[np.argmax(y)] 1.0 """ # Note that - from the doctest above - y is """ array([ -3.90255908e-16, 1.99000000e-01, 3.99000000e-01, 5.99000000e-01, 7.99000000e-01, 9.99000000e-01, 7.99000000e-01, 5.99000000e-01, 3.99000000e-01, 1.99000000e-01, 6.74679706e-16]) """ # - so the peak value is 1-dt - rather than 1 - but we get the same # result from using np.convolve - see tests. f_vals = _eval_for(f, f_interval, dt) g_vals = _eval_for(g, g_interval, dt) fg_time, fg_vals = _conv_fx_gx(f_vals, g_vals, dt, min(f_interval), min(g_interval)) return interp(fg_time, fg_vals, fill=fill, name=name, **kwargs)