Skip to content

Instantly share code, notes, and snippets.

@larsoner
Last active November 16, 2020 16:38
Show Gist options
  • Select an option

  • Save larsoner/400b8b07cc89ad96fc2c02d0c3fa254b to your computer and use it in GitHub Desktop.

Select an option

Save larsoner/400b8b07cc89ad96fc2c02d0c3fa254b to your computer and use it in GitHub Desktop.
import numpy as np
from scipy import stats
import mne
import matplotlib.pyplot as plt
# 1. Load data
raw = mne.io.read_raw_fif('CC_CP004_PN_L_RUN01_tsss.fif')
other = mne.io.read_raw_egi('CC_CP004_PN_L_Run01_20201016_110457.mff')
# 2. Get times for both instances that should match
t_raw = (mne.find_events(raw)[:, 0] - raw.first_samp) / raw.info['sfreq']
t_other = (mne.find_events(other)[:, 0] - other.first_samp) / other.info['sfreq']
assert len(t_raw) == len(t_other)
# 3. Compute correction factors
r = np.corrcoef(t_other, t_raw)[0, 1]
coef = np.polyfit(t_other, t_raw, deg=1)
r_, p = stats.pearsonr(t_other, t_raw)
assert p < 1e-6 # very good match
assert np.isclose(r, r_) # same computations
dr_ms_s = 1000 * abs(1 - coef[0])
print(f'Drift rate computed to be {1000 * dr_ms_s:0.1f} μs/sec '
f'(total drift over {raw.times[-1]:0.1f} sec recording: '
f'{raw.times[-1] * dr_ms_s:0.1f} ms)')
# 4. Crop start of recordings to match using the zero-order term
if coef[1] > 0: # need to crop start of raw to match other
raw.crop(coef[1], None)
t_raw -= coef[1]
else: # need to crop start of other to match raw
other.crop(-coef[1], None)
t_other += coef[1]
assert np.isclose(np.polyfit(t_other, t_raw, deg=1)[1], 0., atol=2e-3)
# 5. Resample data using the first-order term
coef = coef[0]
t_pred = t_other * coef
sfreq_new = raw.info['sfreq'] * coef
other.load_data().resample(sfreq_new, verbose=True)
other._times = np.arange(len(other.times)) / raw.info['sfreq']
other.info['sfreq'] = raw.info['sfreq']
t_fixed = (mne.find_events(other)[:, 0] - other.first_samp) / other.info['sfreq']
# 6. Check that everything aligns properly
fig, ax = plt.subplots()
ax.plot(t_raw, 1000 * (t_raw - t_other), 'k', zorder=4, label='orig')
ax.set(title='Drift', xlabel='raw time (s)', ylabel='Delta (ms)')
ax.plot(t_raw, 1000 * (t_raw - t_pred), 'r:', lw=2, zorder=3, label='pred')
ax.plot(t_raw, 1000 * (t_raw - t_fixed), 'r:', lw=2, zorder=3, label='fixed')
if raw.times[-1] > other.times[-1]:
raw.crop(0, other.times[-1])
elif raw.times[-1] < other.times[-1]:
other.crop(0, raw.times[-1])
assert raw.times[-1] == other.times[-1] and len(raw.times) == len(other.times)
# 7. Concatenate (if we want) and look at stim channel alignments
raw_both = raw.copy().load_data().add_channels([other], force_update_info=True)
stim_picks = mne.pick_types(raw_both.info, stim=True)
raw_both.plot(order=stim_picks, start=t_raw[0] - 1.) # should line up
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment