spiketools.windowing
Windowed analyses on canonical spiketimes arrays.
Examples
Shared example setup used throughout the documentation:
import numpy as np
from spiketools import gamma_spikes, time_resolved
from spiketools.variability import cv2
np.random.seed(0)
rates = np.array([6.0] * 10 + [5.6, 6.3, 5.9, 6.5, 5.8, 6.1, 5.7, 6.4, 6.0, 5.5], dtype=float)
orders = np.array([0.2] * 10 + [1.0, 2.0, 2.0, 3.0, 1.0, 2.0, 3.0, 2.0, 1.0, 3.0], dtype=float)
spiketimes = gamma_spikes(rates=rates, order=orders, tlim=[0.0, 5000.0], dt=1.0)
values, window_time = time_resolved(
spiketimes,
window=1000.0,
func=cv2,
kwargs={"pool": True},
tlim=[0.0, 5000.0],
tstep=250.0,
)
1"""Windowed analyses on canonical `spiketimes` arrays. 2 3Examples 4-------- 5Shared example setup used throughout the documentation: 6 7```python 8import numpy as np 9 10from spiketools import gamma_spikes, time_resolved 11from spiketools.variability import cv2 12 13np.random.seed(0) 14rates = np.array([6.0] * 10 + [5.6, 6.3, 5.9, 6.5, 5.8, 6.1, 5.7, 6.4, 6.0, 5.5], dtype=float) 15orders = np.array([0.2] * 10 + [1.0, 2.0, 2.0, 3.0, 1.0, 2.0, 3.0, 2.0, 1.0, 3.0], dtype=float) 16spiketimes = gamma_spikes(rates=rates, order=orders, tlim=[0.0, 5000.0], dt=1.0) 17 18values, window_time = time_resolved( 19 spiketimes, 20 window=1000.0, 21 func=cv2, 22 kwargs={"pool": True}, 23 tlim=[0.0, 5000.0], 24 tstep=250.0, 25) 26``` 27""" 28 29from __future__ import annotations 30 31from typing import Callable, Optional, Sequence 32 33import pylab 34 35from .conversion import ( 36 binary_to_spiketimes, 37 cut_spiketimes, 38 get_time_limits, 39 spiketimes_to_binary, 40) 41from .rate import gaussian_kernel, kernel_rate 42from .transforms import time_warp 43 44__all__ = [ 45 "rate_warped_analysis", 46 "time_resolved", 47 "time_resolved_new", 48] 49 50 51def time_resolved( 52 spiketimes, 53 window, 54 func: Callable, 55 kwargs=None, 56 tlim: Optional[Sequence[float]] = None, 57 tstep=1.0, 58): 59 r""" 60 Apply a function to successive windows of a spike train. 61 62 Parameters 63 ---------- 64 spiketimes: 65 Canonical spike representation. 66 window: 67 Window size in ms. 68 func: 69 Callable receiving the cropped `spiketimes` of each window. 70 kwargs: 71 Optional keyword arguments passed to `func`. 72 tlim: 73 Optional analysis interval `[tmin, tmax]` in ms. 74 tstep: 75 Window step size in ms. 76 77 Returns 78 ------- 79 tuple[list, np.ndarray] 80 Function outputs and window-center times. 81 82 Definition 83 ---------- 84 If $S$ denotes the spike train and $f$ the analysis function, then 85 86 $$ 87 y_n = f\left(S \cap [t_n, t_n + W)\right), 88 \qquad 89 t_n^\mathrm{center} = t_n + \frac{W}{2}, 90 $$ 91 92 where $W$ is the window width and consecutive windows are offset by 93 `tstep`. 94 95 Examples 96 -------- 97 >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]]) 98 >>> values, times = time_resolved(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0) 99 >>> values 100 [1, 1, 1, 1] 101 >>> times.tolist() 102 [0.5, 1.5, 2.5, 3.5] 103 """ 104 if kwargs is None: 105 kwargs = {} 106 if tlim is None: 107 tlim = get_time_limits(spiketimes) 108 cut_spikes = cut_spiketimes(spiketimes, tlim) 109 110 tmin = tlim[0] 111 tmax = tmin + window 112 tcenter = tmin + 0.5 * (tmax - tstep - tmin) 113 114 time = [] 115 func_out = [] 116 while tmax <= tlim[1]: 117 windowspikes = cut_spiketimes(cut_spikes, [tmin, tmax]) 118 time.append(tcenter) 119 func_out.append(func(windowspikes, **kwargs)) 120 121 tmin += tstep 122 tmax += tstep 123 tcenter += tstep 124 125 return func_out, pylab.array(time) 126 127 128def time_resolved_new( 129 spiketimes, 130 window, 131 func: Callable, 132 kwargs=None, 133 tlim: Optional[Sequence[float]] = None, 134 tstep=1.0, 135): 136 r""" 137 Windowed evaluation using an intermediate binary representation. 138 139 This variant can be faster for repeated window extraction because the spike 140 train is binned once before the loop. 141 142 Definition 143 ---------- 144 This function evaluates the same windowed statistic as `time_resolved(...)`, 145 but it first bins the spike train into a count matrix $B$ and reconstructs 146 each window from the relevant slice: 147 148 Let $k_{n} = \lfloor t_{n} \rfloor$ be the left window index on the binned 149 grid and let $K = \lfloor W \rfloor$ be the corresponding window width in 150 bins. Let $B_{n}$ denote the corresponding slice of the count matrix and let 151 $\mathcal{C}$ denote the conversion from a binary window back to spike 152 times. The statistic is then 153 154 $$ 155 y_n = f\left(\mathcal{C}(B_{n})\right). 156 $$ 157 158 Examples 159 -------- 160 >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]]) 161 >>> values, times = time_resolved_new(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0) 162 >>> values 163 [1, 1, 1, 1] 164 """ 165 if kwargs is None: 166 kwargs = {} 167 if tlim is None: 168 tlim = get_time_limits(spiketimes) 169 170 binary, btime = spiketimes_to_binary(spiketimes, tlim=tlim) 171 172 tmin = tlim[0] 173 tmax = tmin + window 174 tcenter = 0.5 * (tmax + tmin) 175 176 time = [] 177 func_out = [] 178 while tmax <= tlim[1]: 179 windowspikes = binary_to_spiketimes( 180 binary[:, int(tmin) : int(tmax)], btime[int(tmin) : int(tmax)] 181 ) 182 time.append(tcenter) 183 func_out.append(func(windowspikes, **kwargs)) 184 185 tmin += tstep 186 tmax += tstep 187 tcenter += tstep 188 return func_out, pylab.array(time) 189 190 191def rate_warped_analysis( 192 spiketimes, 193 window, 194 step=1.0, 195 tlim=None, 196 rate=None, 197 func=lambda x: x.shape[1] / x[1].max(), 198 kwargs=None, 199 rate_kernel=None, 200 dt=1.0, 201): 202 r""" 203 Apply a function after warping time according to instantaneous rate. 204 205 Parameters 206 ---------- 207 spiketimes: 208 Canonical spike representation. 209 window: 210 Window size in warped time. Use `"full"` to analyze the complete warped 211 recording at once. 212 step: 213 Step size in warped time. 214 tlim: 215 Optional `[tmin, tmax]` interval in ms. 216 rate: 217 Optional precomputed `(rate, time)` tuple as returned by 218 `kernel_rate(...)`. 219 func: 220 Statistic to evaluate on the warped spike train. 221 kwargs: 222 Optional keyword arguments forwarded to `func`. 223 rate_kernel: 224 Kernel used when `rate` is not provided. 225 dt: 226 Sampling interval in ms used for the rate estimate. 227 228 Definition 229 ---------- 230 The idea is to replace real time $t$ by warped time $\tau(t)$, 231 232 $$ 233 \tau(t) = \int_{t_0}^{t} \frac{r(u)}{1000}\,du, 234 $$ 235 236 where $r(u)$ is an estimated firing rate in spikes/s. Equal distances on 237 the warped axis therefore correspond to equal expected spike counts. High- 238 rate epochs are stretched, low-rate epochs are compressed, and the chosen 239 statistic is then evaluated on this transformed spike train. If windowed 240 output is requested, the resulting window centers are mapped back to real 241 time before returning. 242 243 Examples 244 -------- 245 >>> spikes = pylab.array([[0.0, 3.0, 6.0], [0.0, 0.0, 0.0]]) 246 >>> result, warped_duration = rate_warped_analysis( 247 ... spikes, 248 ... window="full", 249 ... func=lambda s: s.shape[1], 250 ... rate=(pylab.array([[500.0, 500.0, 500.0]]), pylab.array([0.0, 1.0, 2.0])), 251 ... dt=1.0, 252 ... ) 253 >>> int(result), round(float(warped_duration), 3) 254 (3, 1.5) 255 """ 256 if kwargs is None: 257 kwargs = {} 258 if rate_kernel is None: 259 rate_kernel = gaussian_kernel(50.0) 260 if rate is None: 261 rate = kernel_rate(spiketimes, rate_kernel, tlim) 262 263 rate, trate = rate 264 rate_tlim = [trate.min(), trate.max() + 1] 265 spiketimes = cut_spiketimes(spiketimes, rate_tlim) 266 267 ot = pylab.cumsum(rate) / 1000.0 * dt 268 w_spiketimes = spiketimes.copy() 269 w_spiketimes[0, :] = time_warp(spiketimes[0, :], trate, ot) 270 271 if window == "full": 272 return func(w_spiketimes, **kwargs), ot.max() 273 274 result, tresult = time_resolved( 275 w_spiketimes, window, func, kwargs, tstep=step 276 ) 277 tresult = time_warp(tresult, ot, trate) 278 return result, tresult
192def rate_warped_analysis( 193 spiketimes, 194 window, 195 step=1.0, 196 tlim=None, 197 rate=None, 198 func=lambda x: x.shape[1] / x[1].max(), 199 kwargs=None, 200 rate_kernel=None, 201 dt=1.0, 202): 203 r""" 204 Apply a function after warping time according to instantaneous rate. 205 206 Parameters 207 ---------- 208 spiketimes: 209 Canonical spike representation. 210 window: 211 Window size in warped time. Use `"full"` to analyze the complete warped 212 recording at once. 213 step: 214 Step size in warped time. 215 tlim: 216 Optional `[tmin, tmax]` interval in ms. 217 rate: 218 Optional precomputed `(rate, time)` tuple as returned by 219 `kernel_rate(...)`. 220 func: 221 Statistic to evaluate on the warped spike train. 222 kwargs: 223 Optional keyword arguments forwarded to `func`. 224 rate_kernel: 225 Kernel used when `rate` is not provided. 226 dt: 227 Sampling interval in ms used for the rate estimate. 228 229 Definition 230 ---------- 231 The idea is to replace real time $t$ by warped time $\tau(t)$, 232 233 $$ 234 \tau(t) = \int_{t_0}^{t} \frac{r(u)}{1000}\,du, 235 $$ 236 237 where $r(u)$ is an estimated firing rate in spikes/s. Equal distances on 238 the warped axis therefore correspond to equal expected spike counts. High- 239 rate epochs are stretched, low-rate epochs are compressed, and the chosen 240 statistic is then evaluated on this transformed spike train. If windowed 241 output is requested, the resulting window centers are mapped back to real 242 time before returning. 243 244 Examples 245 -------- 246 >>> spikes = pylab.array([[0.0, 3.0, 6.0], [0.0, 0.0, 0.0]]) 247 >>> result, warped_duration = rate_warped_analysis( 248 ... spikes, 249 ... window="full", 250 ... func=lambda s: s.shape[1], 251 ... rate=(pylab.array([[500.0, 500.0, 500.0]]), pylab.array([0.0, 1.0, 2.0])), 252 ... dt=1.0, 253 ... ) 254 >>> int(result), round(float(warped_duration), 3) 255 (3, 1.5) 256 """ 257 if kwargs is None: 258 kwargs = {} 259 if rate_kernel is None: 260 rate_kernel = gaussian_kernel(50.0) 261 if rate is None: 262 rate = kernel_rate(spiketimes, rate_kernel, tlim) 263 264 rate, trate = rate 265 rate_tlim = [trate.min(), trate.max() + 1] 266 spiketimes = cut_spiketimes(spiketimes, rate_tlim) 267 268 ot = pylab.cumsum(rate) / 1000.0 * dt 269 w_spiketimes = spiketimes.copy() 270 w_spiketimes[0, :] = time_warp(spiketimes[0, :], trate, ot) 271 272 if window == "full": 273 return func(w_spiketimes, **kwargs), ot.max() 274 275 result, tresult = time_resolved( 276 w_spiketimes, window, func, kwargs, tstep=step 277 ) 278 tresult = time_warp(tresult, ot, trate) 279 return result, tresult
Apply a function after warping time according to instantaneous rate.
Parameters
- spiketimes:: Canonical spike representation.
- window:: Window size in warped time. Use
"full"to analyze the complete warped recording at once. - step:: Step size in warped time.
- tlim:: Optional
[tmin, tmax]interval in ms. - rate:: Optional precomputed
(rate, time)tuple as returned bykernel_rate(...). - func:: Statistic to evaluate on the warped spike train.
- kwargs:: Optional keyword arguments forwarded to
func. - rate_kernel:: Kernel used when
rateis not provided. - dt:: Sampling interval in ms used for the rate estimate.
Definition
The idea is to replace real time $t$ by warped time $\tau(t)$,
$$ \tau(t) = \int_{t_0}^{t} \frac{r(u)}{1000}\,du, $$
where $r(u)$ is an estimated firing rate in spikes/s. Equal distances on the warped axis therefore correspond to equal expected spike counts. High- rate epochs are stretched, low-rate epochs are compressed, and the chosen statistic is then evaluated on this transformed spike train. If windowed output is requested, the resulting window centers are mapped back to real time before returning.
Examples
>>> spikes = pylab.array([[0.0, 3.0, 6.0], [0.0, 0.0, 0.0]])
>>> result, warped_duration = rate_warped_analysis(
... spikes,
... window="full",
... func=lambda s: s.shape[1],
... rate=(pylab.array([[500.0, 500.0, 500.0]]), pylab.array([0.0, 1.0, 2.0])),
... dt=1.0,
... )
>>> int(result), round(float(warped_duration), 3)
(3, 1.5)
52def time_resolved( 53 spiketimes, 54 window, 55 func: Callable, 56 kwargs=None, 57 tlim: Optional[Sequence[float]] = None, 58 tstep=1.0, 59): 60 r""" 61 Apply a function to successive windows of a spike train. 62 63 Parameters 64 ---------- 65 spiketimes: 66 Canonical spike representation. 67 window: 68 Window size in ms. 69 func: 70 Callable receiving the cropped `spiketimes` of each window. 71 kwargs: 72 Optional keyword arguments passed to `func`. 73 tlim: 74 Optional analysis interval `[tmin, tmax]` in ms. 75 tstep: 76 Window step size in ms. 77 78 Returns 79 ------- 80 tuple[list, np.ndarray] 81 Function outputs and window-center times. 82 83 Definition 84 ---------- 85 If $S$ denotes the spike train and $f$ the analysis function, then 86 87 $$ 88 y_n = f\left(S \cap [t_n, t_n + W)\right), 89 \qquad 90 t_n^\mathrm{center} = t_n + \frac{W}{2}, 91 $$ 92 93 where $W$ is the window width and consecutive windows are offset by 94 `tstep`. 95 96 Examples 97 -------- 98 >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]]) 99 >>> values, times = time_resolved(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0) 100 >>> values 101 [1, 1, 1, 1] 102 >>> times.tolist() 103 [0.5, 1.5, 2.5, 3.5] 104 """ 105 if kwargs is None: 106 kwargs = {} 107 if tlim is None: 108 tlim = get_time_limits(spiketimes) 109 cut_spikes = cut_spiketimes(spiketimes, tlim) 110 111 tmin = tlim[0] 112 tmax = tmin + window 113 tcenter = tmin + 0.5 * (tmax - tstep - tmin) 114 115 time = [] 116 func_out = [] 117 while tmax <= tlim[1]: 118 windowspikes = cut_spiketimes(cut_spikes, [tmin, tmax]) 119 time.append(tcenter) 120 func_out.append(func(windowspikes, **kwargs)) 121 122 tmin += tstep 123 tmax += tstep 124 tcenter += tstep 125 126 return func_out, pylab.array(time)
Apply a function to successive windows of a spike train.
Parameters
- spiketimes:: Canonical spike representation.
- window:: Window size in ms.
- func:: Callable receiving the cropped
spiketimesof each window. - kwargs:: Optional keyword arguments passed to
func. - tlim:: Optional analysis interval
[tmin, tmax]in ms. - tstep:: Window step size in ms.
Returns
- tuple[list, np.ndarray]: Function outputs and window-center times.
Definition
If $S$ denotes the spike train and $f$ the analysis function, then
$$ y_n = f\left(S \cap [t_n, t_n + W)\right), \qquad t_n^\mathrm{center} = t_n + \frac{W}{2}, $$
where $W$ is the window width and consecutive windows are offset by
tstep.
Examples
>>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
>>> values, times = time_resolved(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
>>> values
[1, 1, 1, 1]
>>> times.tolist()
[0.5, 1.5, 2.5, 3.5]
129def time_resolved_new( 130 spiketimes, 131 window, 132 func: Callable, 133 kwargs=None, 134 tlim: Optional[Sequence[float]] = None, 135 tstep=1.0, 136): 137 r""" 138 Windowed evaluation using an intermediate binary representation. 139 140 This variant can be faster for repeated window extraction because the spike 141 train is binned once before the loop. 142 143 Definition 144 ---------- 145 This function evaluates the same windowed statistic as `time_resolved(...)`, 146 but it first bins the spike train into a count matrix $B$ and reconstructs 147 each window from the relevant slice: 148 149 Let $k_{n} = \lfloor t_{n} \rfloor$ be the left window index on the binned 150 grid and let $K = \lfloor W \rfloor$ be the corresponding window width in 151 bins. Let $B_{n}$ denote the corresponding slice of the count matrix and let 152 $\mathcal{C}$ denote the conversion from a binary window back to spike 153 times. The statistic is then 154 155 $$ 156 y_n = f\left(\mathcal{C}(B_{n})\right). 157 $$ 158 159 Examples 160 -------- 161 >>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]]) 162 >>> values, times = time_resolved_new(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0) 163 >>> values 164 [1, 1, 1, 1] 165 """ 166 if kwargs is None: 167 kwargs = {} 168 if tlim is None: 169 tlim = get_time_limits(spiketimes) 170 171 binary, btime = spiketimes_to_binary(spiketimes, tlim=tlim) 172 173 tmin = tlim[0] 174 tmax = tmin + window 175 tcenter = 0.5 * (tmax + tmin) 176 177 time = [] 178 func_out = [] 179 while tmax <= tlim[1]: 180 windowspikes = binary_to_spiketimes( 181 binary[:, int(tmin) : int(tmax)], btime[int(tmin) : int(tmax)] 182 ) 183 time.append(tcenter) 184 func_out.append(func(windowspikes, **kwargs)) 185 186 tmin += tstep 187 tmax += tstep 188 tcenter += tstep 189 return func_out, pylab.array(time)
Windowed evaluation using an intermediate binary representation.
This variant can be faster for repeated window extraction because the spike train is binned once before the loop.
Definition
This function evaluates the same windowed statistic as time_resolved(...),
but it first bins the spike train into a count matrix $B$ and reconstructs
each window from the relevant slice:
Let $k_{n} = \lfloor t_{n} \rfloor$ be the left window index on the binned grid and let $K = \lfloor W \rfloor$ be the corresponding window width in bins. Let $B_{n}$ denote the corresponding slice of the count matrix and let $\mathcal{C}$ denote the conversion from a binary window back to spike times. The statistic is then
$$ y_n = f\left(\mathcal{C}(B_{n})\right). $$
Examples
>>> spikes = pylab.array([[0.0, 2.0, 4.0], [0.0, 0.0, 0.0]])
>>> values, times = time_resolved_new(spikes, window=2.0, func=lambda s: s.shape[1], tlim=[0.0, 5.0], tstep=1.0)
>>> values
[1, 1, 1, 1]