spiketools.windowing

Windowed analyses on canonical spiketimes arrays.

Examples

Shared example setup used throughout the documentation:

import numpy as np

from spiketools import gamma_spikes, time_resolved
from spiketools.variability import cv2

np.random.seed(0)
rates = np.array([6.0] * 10 + [5.6, 6.3, 5.9, 6.5, 5.8, 6.1, 5.7, 6.4, 6.0, 5.5], dtype=float)
orders = np.array([0.2] * 10 + [1.0, 2.0, 2.0, 3.0, 1.0, 2.0, 3.0, 2.0, 1.0, 3.0], dtype=float)
spiketimes = gamma_spikes(rates=rates, order=orders, tlim=[0.0, 5000.0], dt=1.0)

values, window_time = time_resolved(
    spiketimes,
    window=1000.0,
    func=cv2,
    kwargs={"pool": True},
    tlim=[0.0, 5000.0],
    tstep=250.0,
)
  1"""Windowed analyses on canonical `spiketimes` arrays.
  2
  3Examples
  4--------
  5Shared example setup used throughout the documentation:
  6
  7```python
  8import numpy as np
  9
 10from spiketools import gamma_spikes, time_resolved
 11from spiketools.variability import cv2
 12
 13np.random.seed(0)
 14rates = np.array([6.0] * 10 + [5.6, 6.3, 5.9, 6.5, 5.8, 6.1, 5.7, 6.4, 6.0, 5.5], dtype=float)
 15orders = np.array([0.2] * 10 + [1.0, 2.0, 2.0, 3.0, 1.0, 2.0, 3.0, 2.0, 1.0, 3.0], dtype=float)
 16spiketimes = gamma_spikes(rates=rates, order=orders, tlim=[0.0, 5000.0], dt=1.0)
 17
 18values, window_time = time_resolved(
 19    spiketimes,
 20    window=1000.0,
 21    func=cv2,
 22    kwargs={"pool": True},
 23    tlim=[0.0, 5000.0],
 24    tstep=250.0,
 25)
 26```
 27"""
 28
 29from __future__ import annotations
 30
 31from typing import Callable, Optional, Sequence
 32
 33import pylab
 34
 35from .conversion import (
 36    binary_to_spiketimes,
 37    cut_spiketimes,
 38    get_time_limits,
 39    spiketimes_to_binary,
 40)
 41from .rate import gaussian_kernel, kernel_rate
 42from .transforms import time_warp
 43
 44__all__ = [
 45    "rate_warped_analysis",
 46    "time_resolved",
 47    "time_resolved_new",
 48]
 49
 50
 51def time_resolved(
 52    spiketimes,
 53    window,
 54    func: Callable,
 55    kwargs=None,
 56    tlim: Optional[Sequence[float]] = None,
 57    tstep=1.0,
 58):
 59    r"""
 60    Apply a function to successive windows of a spike train.
 61
 62    Parameters
 63    ----------
 64    spiketimes:
 65        Canonical spike representation.
 66    window:
 67        Window size in ms.
 68    func:
 69        Callable receiving the cropped `spiketimes` of each window.
 70    kwargs:
 71        Optional keyword arguments passed to `func`.
 72    tlim:
 73        Optional analysis interval `[tmin, tmax]` in ms.
 74    tstep:
 75        Window step size in ms.
 76
 77    Returns
 78    -------
 79    tuple[list, np.ndarray]
 80        Function outputs and window-center times.
 81
 82    Definition
 83    ----------
 84    If $S$ denotes the spike train and $f$ the analysis function, then
 85
 86    $$
 87    y_n = f\left(S \cap [t_n, t_n + W)\right),
 88    \qquad
 89    t_n^\mathrm{center} = t_n + \frac{W}{2},
 90    $$
 91
 92    where $W$ is the window width and consecutive windows are offset by
 93    `tstep`.
 94
 95    Examples
 96    --------
 97    >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
 98    >>> values, times = time_resolved(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
 99    >>> values
100    [1, 1, 1, 1]
101    >>> times.tolist()
102    [0.5, 1.5, 2.5, 3.5]
103    """
104    if kwargs is None:
105        kwargs = {}
106    if tlim is None:
107        tlim = get_time_limits(spiketimes)
108    cut_spikes = cut_spiketimes(spiketimes, tlim)
109
110    tmin = tlim[0]
111    tmax = tmin + window
112    tcenter = tmin + 0.5 * (tmax - tstep - tmin)
113
114    time = []
115    func_out = []
116    while tmax <= tlim[1]:
117        windowspikes = cut_spiketimes(cut_spikes, [tmin, tmax])
118        time.append(tcenter)
119        func_out.append(func(windowspikes, **kwargs))
120
121        tmin += tstep
122        tmax += tstep
123        tcenter += tstep
124
125    return func_out, pylab.array(time)
126
127
128def time_resolved_new(
129    spiketimes,
130    window,
131    func: Callable,
132    kwargs=None,
133    tlim: Optional[Sequence[float]] = None,
134    tstep=1.0,
135):
136    r"""
137    Windowed evaluation using an intermediate binary representation.
138
139    This variant can be faster for repeated window extraction because the spike
140    train is binned once before the loop.
141
142    Definition
143    ----------
144    This function evaluates the same windowed statistic as `time_resolved(...)`,
145    but it first bins the spike train into a count matrix $B$ and reconstructs
146    each window from the relevant slice:
147
148    Let $k_{n} = \lfloor t_{n} \rfloor$ be the left window index on the binned
149    grid and let $K = \lfloor W \rfloor$ be the corresponding window width in
150    bins. Let $B_{n}$ denote the corresponding slice of the count matrix and let
151    $\mathcal{C}$ denote the conversion from a binary window back to spike
152    times. The statistic is then
153
154    $$
155    y_n = f\left(\mathcal{C}(B_{n})\right).
156    $$
157
158    Examples
159    --------
160    >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
161    >>> values, times = time_resolved_new(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
162    >>> values
163    [1, 1, 1, 1]
164    """
165    if kwargs is None:
166        kwargs = {}
167    if tlim is None:
168        tlim = get_time_limits(spiketimes)
169
170    binary, btime = spiketimes_to_binary(spiketimes, tlim=tlim)
171
172    tmin = tlim[0]
173    tmax = tmin + window
174    tcenter = 0.5 * (tmax + tmin)
175
176    time = []
177    func_out = []
178    while tmax <= tlim[1]:
179        windowspikes = binary_to_spiketimes(
180            binary[:, int(tmin) : int(tmax)], btime[int(tmin) : int(tmax)]
181        )
182        time.append(tcenter)
183        func_out.append(func(windowspikes, **kwargs))
184
185        tmin += tstep
186        tmax += tstep
187        tcenter += tstep
188    return func_out, pylab.array(time)
189
190
191def rate_warped_analysis(
192    spiketimes,
193    window,
194    step=1.0,
195    tlim=None,
196    rate=None,
197    func=lambda x: x.shape[1] / x[1].max(),
198    kwargs=None,
199    rate_kernel=None,
200    dt=1.0,
201):
202    r"""
203    Apply a function after warping time according to instantaneous rate.
204
205    Parameters
206    ----------
207    spiketimes:
208        Canonical spike representation.
209    window:
210        Window size in warped time. Use `"full"` to analyze the complete warped
211        recording at once.
212    step:
213        Step size in warped time.
214    tlim:
215        Optional `[tmin, tmax]` interval in ms.
216    rate:
217        Optional precomputed `(rate, time)` tuple as returned by
218        `kernel_rate(...)`.
219    func:
220        Statistic to evaluate on the warped spike train.
221    kwargs:
222        Optional keyword arguments forwarded to `func`.
223    rate_kernel:
224        Kernel used when `rate` is not provided.
225    dt:
226        Sampling interval in ms used for the rate estimate.
227
228    Definition
229    ----------
230    The idea is to replace real time $t$ by warped time $\tau(t)$,
231
232    $$
233    \tau(t) = \int_{t_0}^{t} \frac{r(u)}{1000}\,du,
234    $$
235
236    where $r(u)$ is an estimated firing rate in spikes/s. Equal distances on
237    the warped axis therefore correspond to equal expected spike counts. High-
238    rate epochs are stretched, low-rate epochs are compressed, and the chosen
239    statistic is then evaluated on this transformed spike train. If windowed
240    output is requested, the resulting window centers are mapped back to real
241    time before returning.
242
243    Examples
244    --------
245    >>> spikes = pylab.array([[0.0, 3.0, 6.0], [0.0, 0.0, 0.0]])
246    >>> result, warped_duration = rate_warped_analysis(
247    ...     spikes,
248    ...     window="full",
249    ...     func=lambda s: s.shape[1],
250    ...     rate=(pylab.array([[500.0, 500.0, 500.0]]), pylab.array([0.0, 1.0, 2.0])),
251    ...     dt=1.0,
252    ... )
253    >>> int(result), round(float(warped_duration), 3)
254    (3, 1.5)
255    """
256    if kwargs is None:
257        kwargs = {}
258    if rate_kernel is None:
259        rate_kernel = gaussian_kernel(50.0)
260    if rate is None:
261        rate = kernel_rate(spiketimes, rate_kernel, tlim)
262
263    rate, trate = rate
264    rate_tlim = [trate.min(), trate.max() + 1]
265    spiketimes = cut_spiketimes(spiketimes, rate_tlim)
266
267    ot = pylab.cumsum(rate) / 1000.0 * dt
268    w_spiketimes = spiketimes.copy()
269    w_spiketimes[0, :] = time_warp(spiketimes[0, :], trate, ot)
270
271    if window == "full":
272        return func(w_spiketimes, **kwargs), ot.max()
273
274    result, tresult = time_resolved(
275        w_spiketimes, window, func, kwargs, tstep=step
276    )
277    tresult = time_warp(tresult, ot, trate)
278    return result, tresult
def rate_warped_analysis( spiketimes, window, step=1.0, tlim=None, rate=None, func=<function <lambda>>, kwargs=None, rate_kernel=None, dt=1.0):
192def rate_warped_analysis(
193    spiketimes,
194    window,
195    step=1.0,
196    tlim=None,
197    rate=None,
198    func=lambda x: x.shape[1] / x[1].max(),
199    kwargs=None,
200    rate_kernel=None,
201    dt=1.0,
202):
203    r"""
204    Apply a function after warping time according to instantaneous rate.
205
206    Parameters
207    ----------
208    spiketimes:
209        Canonical spike representation.
210    window:
211        Window size in warped time. Use `"full"` to analyze the complete warped
212        recording at once.
213    step:
214        Step size in warped time.
215    tlim:
216        Optional `[tmin, tmax]` interval in ms.
217    rate:
218        Optional precomputed `(rate, time)` tuple as returned by
219        `kernel_rate(...)`.
220    func:
221        Statistic to evaluate on the warped spike train.
222    kwargs:
223        Optional keyword arguments forwarded to `func`.
224    rate_kernel:
225        Kernel used when `rate` is not provided.
226    dt:
227        Sampling interval in ms used for the rate estimate.
228
229    Definition
230    ----------
231    The idea is to replace real time $t$ by warped time $\tau(t)$,
232
233    $$
234    \tau(t) = \int_{t_0}^{t} \frac{r(u)}{1000}\,du,
235    $$
236
237    where $r(u)$ is an estimated firing rate in spikes/s. Equal distances on
238    the warped axis therefore correspond to equal expected spike counts. High-
239    rate epochs are stretched, low-rate epochs are compressed, and the chosen
240    statistic is then evaluated on this transformed spike train. If windowed
241    output is requested, the resulting window centers are mapped back to real
242    time before returning.
243
244    Examples
245    --------
246    >>> spikes = pylab.array([[0.0, 3.0, 6.0], [0.0, 0.0, 0.0]])
247    >>> result, warped_duration = rate_warped_analysis(
248    ...     spikes,
249    ...     window="full",
250    ...     func=lambda s: s.shape[1],
251    ...     rate=(pylab.array([[500.0, 500.0, 500.0]]), pylab.array([0.0, 1.0, 2.0])),
252    ...     dt=1.0,
253    ... )
254    >>> int(result), round(float(warped_duration), 3)
255    (3, 1.5)
256    """
257    if kwargs is None:
258        kwargs = {}
259    if rate_kernel is None:
260        rate_kernel = gaussian_kernel(50.0)
261    if rate is None:
262        rate = kernel_rate(spiketimes, rate_kernel, tlim)
263
264    rate, trate = rate
265    rate_tlim = [trate.min(), trate.max() + 1]
266    spiketimes = cut_spiketimes(spiketimes, rate_tlim)
267
268    ot = pylab.cumsum(rate) / 1000.0 * dt
269    w_spiketimes = spiketimes.copy()
270    w_spiketimes[0, :] = time_warp(spiketimes[0, :], trate, ot)
271
272    if window == "full":
273        return func(w_spiketimes, **kwargs), ot.max()
274
275    result, tresult = time_resolved(
276        w_spiketimes, window, func, kwargs, tstep=step
277    )
278    tresult = time_warp(tresult, ot, trate)
279    return result, tresult

Apply a function after warping time according to instantaneous rate.

Parameters
  • spiketimes:: Canonical spike representation.
  • window:: Window size in warped time. Use "full" to analyze the complete warped recording at once.
  • step:: Step size in warped time.
  • tlim:: Optional [tmin, tmax] interval in ms.
  • rate:: Optional precomputed (rate, time) tuple as returned by kernel_rate(...).
  • func:: Statistic to evaluate on the warped spike train.
  • kwargs:: Optional keyword arguments forwarded to func.
  • rate_kernel:: Kernel used when rate is not provided.
  • dt:: Sampling interval in ms used for the rate estimate.
Definition

The idea is to replace real time $t$ by warped time $\tau(t)$,

$$ \tau(t) = \int_{t_0}^{t} \frac{r(u)}{1000}\,du, $$

where $r(u)$ is an estimated firing rate in spikes/s. Equal distances on the warped axis therefore correspond to equal expected spike counts. High- rate epochs are stretched, low-rate epochs are compressed, and the chosen statistic is then evaluated on this transformed spike train. If windowed output is requested, the resulting window centers are mapped back to real time before returning.

Examples
>>> spikes = pylab.array([[0.0, 3.0, 6.0], [0.0, 0.0, 0.0]])
>>> result, warped_duration = rate_warped_analysis(
...     spikes,
...     window="full",
...     func=lambda s: s.shape[1],
...     rate=(pylab.array([[500.0, 500.0, 500.0]]), pylab.array([0.0, 1.0, 2.0])),
...     dt=1.0,
... )
>>> int(result), round(float(warped_duration), 3)
(3, 1.5)
def time_resolved( spiketimes, window, func: Callable, kwargs=None, tlim: Optional[Sequence[float]] = None, tstep=1.0):
 52def time_resolved(
 53    spiketimes,
 54    window,
 55    func: Callable,
 56    kwargs=None,
 57    tlim: Optional[Sequence[float]] = None,
 58    tstep=1.0,
 59):
 60    r"""
 61    Apply a function to successive windows of a spike train.
 62
 63    Parameters
 64    ----------
 65    spiketimes:
 66        Canonical spike representation.
 67    window:
 68        Window size in ms.
 69    func:
 70        Callable receiving the cropped `spiketimes` of each window.
 71    kwargs:
 72        Optional keyword arguments passed to `func`.
 73    tlim:
 74        Optional analysis interval `[tmin, tmax]` in ms.
 75    tstep:
 76        Window step size in ms.
 77
 78    Returns
 79    -------
 80    tuple[list, np.ndarray]
 81        Function outputs and window-center times.
 82
 83    Definition
 84    ----------
 85    If $S$ denotes the spike train and $f$ the analysis function, then
 86
 87    $$
 88    y_n = f\left(S \cap [t_n, t_n + W)\right),
 89    \qquad
 90    t_n^\mathrm{center} = t_n + \frac{W}{2},
 91    $$
 92
 93    where $W$ is the window width and consecutive windows are offset by
 94    `tstep`.
 95
 96    Examples
 97    --------
 98    >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
 99    >>> values, times = time_resolved(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
100    >>> values
101    [1, 1, 1, 1]
102    >>> times.tolist()
103    [0.5, 1.5, 2.5, 3.5]
104    """
105    if kwargs is None:
106        kwargs = {}
107    if tlim is None:
108        tlim = get_time_limits(spiketimes)
109    cut_spikes = cut_spiketimes(spiketimes, tlim)
110
111    tmin = tlim[0]
112    tmax = tmin + window
113    tcenter = tmin + 0.5 * (tmax - tstep - tmin)
114
115    time = []
116    func_out = []
117    while tmax <= tlim[1]:
118        windowspikes = cut_spiketimes(cut_spikes, [tmin, tmax])
119        time.append(tcenter)
120        func_out.append(func(windowspikes, **kwargs))
121
122        tmin += tstep
123        tmax += tstep
124        tcenter += tstep
125
126    return func_out, pylab.array(time)

Apply a function to successive windows of a spike train.

Parameters
  • spiketimes:: Canonical spike representation.
  • window:: Window size in ms.
  • func:: Callable receiving the cropped spiketimes of each window.
  • kwargs:: Optional keyword arguments passed to func.
  • tlim:: Optional analysis interval [tmin, tmax] in ms.
  • tstep:: Window step size in ms.
Returns
  • tuple[list, np.ndarray]: Function outputs and window-center times.
Definition

If $S$ denotes the spike train and $f$ the analysis function, then

$$ y_n = f\left(S \cap [t_n, t_n + W)\right), \qquad t_n^\mathrm{center} = t_n + \frac{W}{2}, $$

where $W$ is the window width and consecutive windows are offset by tstep.

Examples
>>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
>>> values, times = time_resolved(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
>>> values
[1, 1, 1, 1]
>>> times.tolist()
[0.5, 1.5, 2.5, 3.5]
def time_resolved_new( spiketimes, window, func: Callable, kwargs=None, tlim: Optional[Sequence[float]] = None, tstep=1.0):
129def time_resolved_new(
130    spiketimes,
131    window,
132    func: Callable,
133    kwargs=None,
134    tlim: Optional[Sequence[float]] = None,
135    tstep=1.0,
136):
137    r"""
138    Windowed evaluation using an intermediate binary representation.
139
140    This variant can be faster for repeated window extraction because the spike
141    train is binned once before the loop.
142
143    Definition
144    ----------
145    This function evaluates the same windowed statistic as `time_resolved(...)`,
146    but it first bins the spike train into a count matrix $B$ and reconstructs
147    each window from the relevant slice:
148
149    Let $k_{n} = \lfloor t_{n} \rfloor$ be the left window index on the binned
150    grid and let $K = \lfloor W \rfloor$ be the corresponding window width in
151    bins. Let $B_{n}$ denote the corresponding slice of the count matrix and let
152    $\mathcal{C}$ denote the conversion from a binary window back to spike
153    times. The statistic is then
154
155    $$
156    y_n = f\left(\mathcal{C}(B_{n})\right).
157    $$
158
159    Examples
160    --------
161    >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
162    >>> values, times = time_resolved_new(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
163    >>> values
164    [1, 1, 1, 1]
165    """
166    if kwargs is None:
167        kwargs = {}
168    if tlim is None:
169        tlim = get_time_limits(spiketimes)
170
171    binary, btime = spiketimes_to_binary(spiketimes, tlim=tlim)
172
173    tmin = tlim[0]
174    tmax = tmin + window
175    tcenter = 0.5 * (tmax + tmin)
176
177    time = []
178    func_out = []
179    while tmax <= tlim[1]:
180        windowspikes = binary_to_spiketimes(
181            binary[:, int(tmin) : int(tmax)], btime[int(tmin) : int(tmax)]
182        )
183        time.append(tcenter)
184        func_out.append(func(windowspikes, **kwargs))
185
186        tmin += tstep
187        tmax += tstep
188        tcenter += tstep
189    return func_out, pylab.array(time)

Windowed evaluation using an intermediate binary representation.

This variant can be faster for repeated window extraction because the spike train is binned once before the loop.

Definition

This function evaluates the same windowed statistic as time_resolved(...), but it first bins the spike train into a count matrix $B$ and reconstructs each window from the relevant slice:

Let $k_{n} = \lfloor t_{n} \rfloor$ be the left window index on the binned grid and let $K = \lfloor W \rfloor$ be the corresponding window width in bins. Let $B_{n}$ denote the corresponding slice of the count matrix and let $\mathcal{C}$ denote the conversion from a binary window back to spike times. The statistic is then

$$ y_n = f\left(\mathcal{C}(B_{n})\right). $$

Examples
>>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
>>> values, times = time_resolved_new(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
>>> values
[1, 1, 1, 1]