Last active
November 21, 2016 20:27
-
-
Save kylerbrown/b8c829b7abe9d22f6f818cfe0c4af4ca to your computer and use it in GitHub Desktop.
Linear time warping functions for labeled acoustic data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
from scipy.signal import fftconvolve | |
def warp_path(fid, test, sampling_rate, insertgaps=True, | |
function=True, remove_fid_offset=True, invert=False): | |
""" | |
fid : fiducial (true) labels. A pandas dataset with fields 'start', 'stop', and 'label'. | |
test : like fid, labels to be warped | |
sampling_rate : sampling rate of returned warp path | |
function : whether to return evenly sampled warp data or an interpolation function | |
insertgaps : if true, gaps are inserted. recomended unless there are no gaps in the original labels | |
remove_fid_offset : sets the fiducial offset to zero, otherwise set to the start of the fiducial labels | |
invert : if True, returns an inverted warp function, such that moments in fid time can be mapped to test times. | |
returns: warp_path : vector of length (fid_labels.stop.iloc[-1] - fid_labels.start.iloc[0]) * sampling_rate | |
if function is True, then instead a warping function is returned of the form: | |
warped_vector = warping_fun(original_vector) | |
""" | |
try: | |
assert np.alltrue(np.diff(fid.start) > 0) | |
assert np.alltrue(np.diff(fid.stop) > 0) | |
assert np.alltrue(fid.start <= fid.stop) | |
assert np.alltrue(np.diff(test.start) > 0) | |
assert np.alltrue(np.diff(test.stop) > 0) | |
assert np.alltrue(test.start <= test.stop) | |
except: | |
print(fid) | |
print(test) | |
raise | |
def addgaps(df): | |
"gaps between syllables get their own row, if not already labeled" | |
gaps = pd.DataFrame( | |
[{'label': 'gap', 'start': x, 'stop': y} for x,y in | |
zip(df.stop.iloc[:-1], df.start.iloc[1:]) if x < y ]) | |
tg = pd.concat((df, gaps)).sort_values('start') | |
tg.index = np.arange(len(tg), dtype=int) | |
return tg | |
if insertgaps: | |
fid = addgaps(fid) | |
test = addgaps(test) | |
try: | |
assert np.alltrue(fid.label == test.label) | |
except: | |
print(fid.label) | |
print(test.label) | |
raise | |
test_Ns = np.array((test.stop - test.start) * sampling_rate, | |
dtype=int)# duration of each test syllable in samples | |
warped_intervals = [] | |
for start, stop, N in zip(fid.start, fid.stop, test_Ns): | |
warped_intervals.append(np.linspace(start, stop, N, endpoint=False)) | |
warped_intervals.append(np.array(fid.stop.iloc[-1]).reshape(1)) | |
_warp_path = np.concatenate(warped_intervals) | |
assert np.alltrue(np.diff(_warp_path) > 0) | |
if remove_fid_offset: | |
_warp_path -= _warp_path[0] | |
orig_intervals = [] # create the time vector in the same way to ensure the | |
# same number of samples | |
for start, stop, N in zip(test.start, test.stop, test_Ns): | |
orig_intervals.append(np.linspace(start, stop, N, endpoint=False)) | |
orig_intervals.append(np.array(test.stop.iloc[-1]).reshape(1)) | |
t = np.concatenate(orig_intervals) | |
assert len(_warp_path) == len(t) | |
if function: | |
if invert: | |
warping_fun = lambda x: np.interp(x, xp=_warp_path, fp=t, | |
left=np.NaN, right=np.NaN) | |
else: | |
warping_fun = lambda x: np.interp(x, xp=t, fp=_warp_path, | |
left=np.NaN, right=np.NaN) | |
return warping_fun | |
else: | |
return t, _warp_path | |
def psth(spikes, bins, window, inv_warp_path, offset=0): | |
"""creates a peri-stimulus time histogram, warps AFTER window is applied | |
need to invert warping function | |
spikes : a pandas dataframe with a 'start' column containing spike times | |
bins : linearly spaced time bins, spikes outside the bin range will not be used | |
window : window to apply, a vector. See scipy.signal.gaussian | |
inv_warp_path : an interpolation function that maps from fiducial time to epoch time | |
offset : add an offset prior to warping | |
returns a numeric vector | |
""" | |
assert np.allclose(np.diff(bins), bins[1] - bins[0]) # check linear spacing | |
period = bins[1] - bins[0] | |
N = len(window) | |
counts = np.zeros_like(bins) | |
# select only relevant spikes | |
teststart = inv_warp_path(bins[0]) - period * N//2 # start of trial window + extra to prevent edge effects | |
teststop = inv_warp_path(bins[-2]) + period * (N//2) | |
spikes = spikes[ (spikes.start > teststart) | |
&(spikes.start < teststop - period)] | |
test_first_sample = int(teststart / period) | |
spike_samples = np.array(spikes.start / period, dtype=int) # bin spikes to sample | |
testbins = np.arange(teststart, teststop, period) # time bins for unwarped data | |
testcount = np.zeros_like(testbins) | |
for s in spike_samples: | |
testcount[s - test_first_sample] += 1 | |
# we now have an unwarped psth: | |
testpsth = fftconvolve(testcount, window, mode="same") | |
#psth look-up function, returns trial psth from fiducial time | |
psth_at_time = lambda x: np.interp(x, xp=testbins, fp=testpsth, left=np.NaN, right=np.NaN) | |
warped_trial_times = inv_warp_path(bins) | |
counts = psth_at_time(warped_trial_times) | |
return counts |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment