Last active
June 8, 2018 05:26
-
-
Save amirkdv/26c4ed8bf89bb26e1530467187a46f9a to your computer and use it in GitHub Desktop.
Statistical Analysis of Coherence in LFP recordings
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import numpy as np | |
from numpy.fft import rfft, rfftfreq, irfft | |
from matplotlib import pyplot as plt | |
from scipy.signal import csd, welch | |
# install via `pip install git+https://github.com/aaren/wavelets` | |
from wavelets import WaveletAnalysis | |
# install via `pip install statsmodels` | |
from statsmodels.tsa.vector_ar.var_model import VAR | |
from statsmodels.tsa.ar_model import AR | |
plt.rcParams.update({'text.latex.preamble' : [r'\usepackage{amsmath}']}) | |
plt.rc('text', usetex=True) | |
#============================================================================== | |
# LFP Simulation | |
#============================================================================== | |
# returns a guassian noise signal of given length and standard deviation. | |
def noise(length, A): | |
return A * np.random.randn(length) | |
# reconstructs the time domain representation of a singal based on its power | |
# spectrum. The signal amplitude is rescaled such that its range (i.e max - | |
# min) is 2A. If phases are not given, they are chosen randomly from a Gaussian | |
# distribution centered at 0 with standard deviation 2pi. | |
def signal_from_spectrum(t, powers, A, phases=None): | |
if phases is not None: | |
assert len(phases) == len(powers) | |
else: | |
phases = 2 * np.pi * np.random.randn(len(powers)) | |
fourier = np.multiply( | |
np.sqrt(2 * powers), # amplitudes at each frequency | |
np.exp(1j * phases) # random phase | |
) | |
np.insert(fourier, 0, 0) | |
x = irfft(fourier, len(t)) | |
x *= 2 * A / (max(x) - min(x)) | |
return x | |
# returns a numpy array representing a function f(x) defined to be 1 (or | |
# amplitude) over the support (inclusive tuple of x range) and 0 elsewhere. | |
def bump_function(x, support, amplitude=1): | |
f = np.zeros(len(x)) | |
in_support = lambda _x: _x >= support[0] and _x <= support[1] | |
return np.array([amplitude if in_support(_x) else 0 for _x in x]) | |
# returns the power spectrum of a 1/f^alpha signal (optionally bandpassed). | |
def lfp_power(t, dt, band=None, alpha=1): | |
f_range = rfftfreq(t.size, d=dt) | |
if band: | |
in_band = lambda f: f >= band[0] and f <= band[1] | |
power = np.array([1. / f ** alpha if f > 0 and in_band(f) else 0 for f in f_range]) | |
else: | |
power = np.array([1. / f ** alpha if f > 0 else 0 for f in f_range]) | |
return power | |
# returns a pair of simulated LFP signals with 1/f^alpha power spectrum | |
# (optionally bandpassed; cf. lfp_power()). Additionally power bumps can be | |
# added to the default power spectrum leading to oscillationas with specified | |
# time and frequency range. A gaussian noise will be superimposed with standard | |
# deviation dictated by SNR (use inf to make noise vanish). | |
def two_channel_lfp(t, dt, A, snr=20, power_bumps=None, band=None): | |
powers = lfp_power(t, dt, band=band) | |
X = signal_from_spectrum(t, powers, A) | |
Y = signal_from_spectrum(t, powers, A) | |
f_range = rfftfreq(t.size, d=dt) | |
noise_amplitude = A / np.sqrt(snr) | |
if noise_amplitude: | |
X += noise(len(t), noise_amplitude) | |
Y += noise(len(t), noise_amplitude) | |
if not power_bumps: | |
return X, Y | |
for power_bump in power_bumps: | |
b_powers = bump_function(f_range, power_bump['freq']) | |
if 'phase_shift' in power_bump: | |
# phase lock the two signals with given shift | |
phases_x = 2 * np.pi * np.random.randn(len(f_range)) | |
phases_y = phases_x + power_bump['phase_shift'] | |
if 'phase_shift_sd' in power_bump: | |
# add noise to phases | |
phases_y += power_bump['phase_shift_sd'] * np.random.randn(len(f_range)) | |
bump_x = signal_from_spectrum(t, b_powers, power_bump['amplitude'], phases=phases_x) | |
bump_y = signal_from_spectrum(t, b_powers, power_bump['amplitude'], phases=phases_y) | |
else: | |
bump_x = signal_from_spectrum(t, b_powers, power_bump['amplitude']) | |
bump_y = signal_from_spectrum(t, b_powers, power_bump['amplitude']) | |
start, end = power_bump['time'] | |
if 'start_sd' in power_bump: | |
start += power_bump['start_sd'] * np.random.randn() | |
start_idx, end_idx = int(start / dt), int(end / dt) | |
in_t_range = lambda idx: idx >= start_idx and idx <= end_idx | |
X += np.array([b if in_t_range(idx) else 0 for idx, b in enumerate(bump_x)]) | |
Y += np.array([b if in_t_range(idx) else 0 for idx, b in enumerate(bump_y)]) | |
return X, Y | |
# returns multiple trials of single channel LFP recordings; all arguments are | |
# passed as is to two_channel_lfp (second channel always ignored) | |
def one_channel_lfp_n(n_trials, *args, **kwargs): | |
return np.array([two_channel_lfp(*args, **kwargs)[0] for _ in range(n_trials)]) | |
# returns multiple trials of two channel LFP recordings; all arguments are | |
# passed as is to two_channel_lfp. | |
def two_channel_lfp_n(n_trials, *args, **kwargs): | |
nX = [[]] * n_trials | |
nY = [[]] * n_trials | |
for i in range(n_trials): | |
nX[i], nY[i] = two_channel_lfp(*args, **kwargs) | |
return np.array(nX), np.array(nY) | |
#============================================================================== | |
# Time Domain Analyses | |
#============================================================================== | |
# Calculates the distance between C(ref, ref+s) and C(t, t+s) for all t. This | |
# serves as a measure of variation in autocovariance function over time. | |
def autocov_distance(ref, t, T, dt, nX, scatter=None, ctr_step=5e-3): | |
mX = np.mean(nX, axis=0) | |
ref_idx = int(ref / dt) | |
cov_ref = np.array([np.mean(nX[:,ref_idx] * nX[:,_t]) for _t in range(len(t))]) | |
ctrs = np.arange(ref + 20e-3, T - 20e-3, ctr_step) | |
dists = np.zeros(len(ctrs)) | |
for idx, ctr in enumerate(ctrs): | |
print ref, ctr | |
ctr_idx = int(ctr / dt) | |
cov_ctr = [np.mean(nX[:,ctr_idx] * nX[:,_t]) - mX[ctr_idx] * mX[_t] | |
for _t in range(len(t))] | |
_cov_ctr = cov_ctr[ctr_idx - ref_idx:] | |
_cov_ref = cov_ref[:len(t) - ctr_idx + ref_idx] | |
support = T - (ctr - ref) | |
dists[idx] = np.sum(np.abs(_cov_ref - _cov_ctr)) / support | |
if scatter: | |
c = np.random.rand(3, 1) | |
scatter.scatter(1000 * (t - ctr), cov_ctr, alpha=.2, facecolor=c, edgecolor=c, s=.2) | |
dists = (dists - min(dists)) / (max(dists) - min(dists)) | |
return ctrs, dists | |
# Fits a multivariate autoregressive model (where the multiple dimensions | |
# represent different trials) to the given single channel data over a rolling | |
# window. At each window center the log(min(root)) of the characteristic | |
# polynomial of the AR model is reported as a stability (stationarity) index. | |
def AR_root_test(nX, dt, T, ax, c, win_len=20e-3, ctr_step=5e-3): | |
win_ctr_ts = np.arange(win_len, T - 50e-3, ctr_step) | |
#win_ctr_ts = np.arange(70e-3, 210e-3, ctr_step) | |
sys.stderr.write('single trial AR #/%d: ' % len(nX)) | |
# single trial AR model | |
for idx, X in enumerate(nX): | |
stability = [] | |
sys.stderr.write('%d ' % (idx + 1)) | |
for ctr_t in win_ctr_ts: | |
ctr_t_idx = int((ctr_t - win_len / 2) / dt) | |
max_t_idx = int((ctr_t + win_len / 2) / dt) | |
ar_res = AR(X[ctr_t_idx:max_t_idx]).fit(5) | |
stability_idx = np.log(min(np.abs(ar_res.roots))) | |
stability.append(stability_idx) | |
ax.plot(1000 * win_ctr_ts, stability, alpha=.2, c=c) | |
sys.stderr.write('\n') | |
# multiple trial VAR model | |
sys.stderr.write('multiple trial VAR:\n') | |
stability = [] | |
for ctr_t in win_ctr_ts: | |
ctr_t_idx = int((ctr_t - win_len / 2) / dt) | |
max_t_idx = int((ctr_t + win_len / 2) / dt) | |
ar_res = VAR(nX[:,ctr_t_idx:max_t_idx]).fit(5) | |
stability_idx = np.log(min(np.abs(ar_res.roots))) | |
sys.stderr.write(' (%.0f, %.0f): %.2f \n' % (1000 * (ctr_t - win_len / 2.), 1000 * (ctr_t + win_len / 2.), stability_idx)) | |
stability.append(stability_idx) | |
sys.stderr.write('\n') | |
ax.plot(1000 * win_ctr_ts, stability, c=c, lw=4, alpha=.7) | |
#============================================================================== | |
# Frequency Domain Analyses | |
#============================================================================== | |
# Estimates the coherence by estimating spectral properties (cross-spectral | |
# density and individual spectral densities) by averaging over trials. | |
def coherence_with_spectral_avg(nX, nY, dt, ax, c): | |
assert len(nX) == len(nY) | |
n_trials = len(nX) | |
sxy = np.array([csd(nX[i], nY[i], fs=1./dt)[1] for i in range(n_trials)]) | |
sxx = np.array([welch(X, fs=1./dt)[1] for X in nX]) | |
syy = np.array([welch(Y, fs=1./dt)[1] for Y in nY]) | |
f, _ = csd(nX[0], nX[0], fs=1./dt) | |
for i in range(len(sxy)): | |
ax.plot(f, (np.abs(sxy[i]) ** 2) / (sxx[i] * syy[i]), c=c, alpha=.1, lw=1) | |
# |<S_xy>|^2 | |
coherence = (np.abs(np.mean(sxy, axis=0)) ** 2) / (np.mean(sxx, axis=0) * np.mean(syy, axis=0)) | |
#coherence = np.abs(np.mean(sxy / (np.sqrt(sxx) * np.sqrt(syy)), axis=0)) ** 2 | |
return f, coherence | |
# <|S_xy|^2> | |
# NOTE trial averaged coherence too depends on phase locking because the | |
# cross-spectrals are added (the correct way above) and if the phase | |
# difference is not constant; they cancel each other out. However, in a | |
# non-phase locked coherence across trials: | |
coherence = np.mean(np.abs(sxy) ** 2, axis=0) / (np.mean(sxx, axis=0) * np.mean(syy, axis=0)) | |
return f, coherence | |
def wavelet_plv(nX, nY, dt): | |
assert len(nX) == len(nY) | |
n_trials = len(nX) | |
phase_diffs = None | |
sys.stderr.write(' # ') | |
for i in range(n_trials): | |
sys.stderr.write('%d ' % (i + 1)) | |
wa_x = WaveletAnalysis(nX[i], dt=dt) | |
wa_y = WaveletAnalysis(nY[i], dt=dt) | |
diff = np.angle(wa_x.wavelet_transform) - np.angle(wa_y.wavelet_transform) | |
if phase_diffs is None: | |
phase_diffs = np.zeros((n_trials,) + diff.shape) | |
phase_diffs[i] = diff | |
sys.stderr.write('\n') | |
return phase_diffs, wa_x.time, 1. / wa_x.fourier_periods | |
def wavelet_coherence(nX, nY, dt): | |
assert len(nX) == len(nY) | |
n_trials = len(nX) | |
s_xx, s_yy, s_xy = None, None, None | |
sys.stderr.write(' # ') | |
for i in range(n_trials): | |
sys.stderr.write('%d ' % (i + 1)) | |
wa_x = WaveletAnalysis(nX[i], dt=dt) | |
wa_y = WaveletAnalysis(nY[i], dt=dt) | |
X_w, Y_w = wa_x.wavelet_transform, wa_y.wavelet_transform | |
if s_xx is None: | |
shape = (n_trials,) + X_w.shape | |
s_xx, s_yy = np.zeros(shape), np.zeros(shape) | |
# add 0j to make the array a complex object; cf. http://stackoverflow.com/a/11965466 | |
s_xy = np.zeros(shape) + 0j | |
s_xx[i] = np.abs(X_w) ** 2 | |
s_yy[i] = np.abs(Y_w) ** 2 | |
s_xy[i] = np.multiply(np.conj(X_w), Y_w) | |
sys.stderr.write('\n') | |
return s_xx, s_yy, s_xy, wa_x.time, 1. / wa_x.fourier_periods | |
#============================================================================== | |
# Plotting helpers | |
#============================================================================== | |
def setup_axis(ax, xlabel, ylabel): | |
ax.grid(True) | |
ax.xaxis.set_label_coords(.9, -.07) | |
ax.set_xlabel(xlabel) | |
ax.set_ylabel(ylabel, rotation='vertical') | |
return ax | |
def highlight_power_bumps(ax, power_bumps, c='b'): | |
for bump in power_bumps: | |
start, end = 1000 * bump['time'][0], 1000 * bump['time'][1] | |
ax.axvspan(start, end, alpha=0.1, color=c) | |
#============================================================================== | |
# Experiments | |
#============================================================================== | |
def plot_sample_lfp(path): | |
T = 400e-3 | |
dt = 10e-5 | |
A = 30 | |
t = np.arange(0, T, dt) | |
fig = plt.figure(figsize=(16, 12)) | |
def _plot(ax_lfp, ax_pow, bumps): | |
X, _ = two_channel_lfp(t, dt, A, snr=20, power_bumps=bumps) | |
ax_lfp.plot(1000 * t, X, c='k', alpha=.7) | |
highlight_power_bumps(ax_lfp, bumps) | |
f, power = rfftfreq(X.size, d=dt), np.abs(rfft(X)) ** 2 | |
ax_pow.plot(f, power, alpha=.7, c='k', lw=2) | |
ax_pow.set_xscale('log') | |
ax_pow.set_yscale('log') | |
if bumps: | |
ax_pow.axvspan(*bumps[0]['freq'], alpha=0.3, color='b') | |
f, power = welch(X, fs=1./dt) | |
ax_pow.plot(f, power, alpha=.7, c='r', lw=2) | |
bumps = [] | |
ax_lfp = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'LFP ($\mu V$)') | |
ax_pow = setup_axis(fig.add_subplot(2, 2, 3), 'frequency (Hz)', 'power density') | |
_plot(ax_lfp, ax_pow, bumps) | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 35), 'start_sd': 0, 'amplitude': 10}] | |
ax_lfp = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'LFP ($\mu V$)') | |
ax_pow = setup_axis(fig.add_subplot(2, 2, 4), 'frequency (Hz)', 'power density') | |
_plot(ax_lfp, ax_pow, bumps) | |
fig.savefig(path, dpi=500) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_stationarity_measures(path): | |
T = 400e-3 | |
dt = 10e-5 | |
A = 30 | |
t = np.arange(0, T, dt) | |
# NOTE discovery: if bump amplitude is ~ 10 => way less reliable | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 35), 'start_sd': 5e-3, 'amplitude': 10}] | |
#bumps = [{'time': ( 50e-3, 150e-3), 'freq': (30, 35), 'start_sd': 0, 'amplitude': 30}, | |
#{'time': (200e-3, 300e-3), 'freq': (60, 65), 'start_sd': 10e-3, 'amplitude': 30}] | |
n_trials = 1000 | |
nX = one_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
fig = plt.figure(figsize=(10, 16)) | |
mX = np.mean(nX, axis=0) | |
ax = setup_axis(fig.add_subplot(4, 1, 1), 'time (ms)', 'LFP ($\mu V$)') | |
ax.plot(1000 * t, nX[0], alpha=.7, c='k') | |
ax.plot(1000 * t, mX, alpha=.7, c='g', lw=3) | |
highlight_power_bumps(ax, bumps) | |
ax = setup_axis(fig.add_subplot(4, 1, 2), 'time (ms)', 'LFP Variance') | |
vX = np.var(nX, axis=0) | |
ax.plot(1000 * t, vX, alpha=.7, c='k') | |
ax = setup_axis(fig.add_subplot(4, 1, 4), | |
'lag $\\tau$ (ms)', '$C(t_\circ, t_\circ + \\tau)$') | |
ctrs, dists = autocov_distance(0, t, T, dt, nX, scatter=ax, ctr_step=5e-3) | |
ax = setup_axis(fig.add_subplot(4, 1, 3), | |
'$t_\circ$ (ms)', | |
'$\\big \\lVert C(0, \cdot) - C(t_\circ, \cdot) \\big \\rVert_1$') | |
ax.plot(1000 * ctrs, dists, c='k', alpha=.7, lw=2) | |
ax.set_xlim(0, 1000 * T) | |
ax.set_ylim(0, 1.1) | |
fig.savefig(path, dpi=500) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_ar_stability_measure(path): | |
T = 400e-3 | |
dt = 25e-5 | |
A = 30 | |
t = np.arange(0, T, dt) | |
n_trials = 15 | |
fig = plt.figure(figsize=(20, 8)) | |
win_len = 30e-3 | |
ctr_step = 5e-3 | |
# NOTE discovery: if bump amplitude is ~ 10 => way less reliable | |
#bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 30}] | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 10}] | |
nX = one_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'LFP ($\mu V$)') | |
ax.plot(1000 * t, nX[0], c='k', alpha=.7) | |
highlight_power_bumps(ax, bumps) | |
ax = setup_axis(fig.add_subplot(2, 2, 3, sharex=ax), | |
'center of %.1f ms window' % (1000 * win_len), | |
'Stability Index ($\log \min |\lambda_i|$)') | |
AR_root_test(nX, dt, T, ax, 'k', win_len=win_len, ctr_step=ctr_step) | |
ax.set_ylim(min(0, ax.get_ylim()[0]) - .1, None) | |
ax.axhline(0, ax.get_xlim()[0], ax.get_xlim()[1], c='r', lw=4, alpha=.4) | |
# ----------- | |
# without power bumps | |
# ----------- | |
nX_stationary = one_channel_lfp_n(n_trials, t, dt, A) | |
ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'LFP ($\mu V$)') | |
ax.plot(1000 * t, nX_stationary[0], c='k', alpha=.7) | |
ax = setup_axis(fig.add_subplot(2, 2, 4, sharex=ax), | |
'center of %.1f ms window' % (1000 * win_len), | |
'Stability Index ($\log \min |\lambda_i|$)') | |
AR_root_test(nX_stationary, dt, T, ax, 'k', win_len=win_len, ctr_step=ctr_step) | |
ax.set_ylim(min(0, ax.get_ylim()[0]) - .1, None) | |
ax.axhline(0, ax.get_xlim()[0], ax.get_xlim()[1], c='r', lw=4, alpha=.4) | |
fig.savefig(path, dpi=500) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_coherence_measures(path): | |
T = 400e-3 | |
dt = 10e-5 | |
A = 30 | |
t = np.arange(0, T, dt) | |
n_trials = 100 | |
fig = plt.figure(figsize=(16, 12)) | |
# NOTE assumes there is only one bump | |
def _plot(bumps, ax_traces, ax_coherence): | |
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
ax_traces[0].plot(1000 * t, nX[0], c='k', alpha=.5) | |
ax_traces[0].plot(1000 * t, nY[0], c='m', alpha=.5) | |
highlight_power_bumps(ax_traces[0], bumps) | |
ax_traces[1].plot(1000 * t, nX[1], c='k', alpha=.5) | |
ax_traces[1].plot(1000 * t, nY[1], c='m', alpha=.5) | |
highlight_power_bumps(ax_traces[1], bumps) | |
ax_coherence.axvspan(*bumps[0]['freq'], alpha=0.3, color='b') | |
f, coh = coherence_with_spectral_avg(nX, nY, dt, ax_coherence, 'r') | |
ax_coherence.plot(f, coh, c='r', lw=5, alpha=.7) | |
# coherence only over bump | |
bump_t = bumps[0]['time'] | |
nX = np.array([X[int(bump_t[0] / dt):int(bump_t[1] / dt)] for X in nX]) | |
nY = np.array([Y[int(bump_t[0] / dt):int(bump_t[1] / dt)] for Y in nY]) | |
f, coh = coherence_with_spectral_avg(nX, nY, dt, ax_coherence, 'g') | |
# add some noise to coherence plots so both red and green coherence in | |
# non-phase locked example are visible | |
coh += np.max(np.array([np.zeros(len(coh)), .01 * np.random.randn(len(coh))])) | |
ax_coherence.plot(f, coh, c='g', lw=6, alpha=.7) | |
ax_coherence.set_xlim(0, 250) | |
ax_coherence.set_ylim(-.1, 1) | |
bump_A = 30 | |
#bump_A = 5 | |
# ----------------- | |
# with phase locking | |
# ----------------- | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': bump_A, 'phase_shift': np.pi / 2.}] | |
ax_traces = [None, None] | |
ax_traces[0] = setup_axis(fig.add_subplot(3, 2, 1), 'time (ms)', 'LFP ($\mu V$)') | |
ax_traces[1] = setup_axis(fig.add_subplot(3, 2, 3), 'time (ms)', 'LFP ($\mu V$)') | |
ax_coherence = setup_axis(fig.add_subplot(3, 2, 5), 'frequency (Hz)', '$|\widehat{\gamma}_{xy}|^2$') | |
_plot(bumps, ax_traces, ax_coherence) | |
# ----------------- | |
# without phase locking | |
# ----------------- | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': bump_A}] | |
ax_traces = [None, None] | |
ax_traces[0] = setup_axis(fig.add_subplot(3, 2, 2), 'time (ms)', 'LFP ($\mu V$)') | |
ax_traces[1] = setup_axis(fig.add_subplot(3, 2, 4), 'time (ms)', 'LFP ($\mu V$)') | |
ax_coherence = setup_axis(fig.add_subplot(3, 2, 6), 'frequency (Hz)', '$|\widehat{\gamma}_{xy}|^2$') | |
_plot(bumps, ax_traces, ax_coherence) | |
fig.savefig(path, dpi=500) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_wavelet_plv(path): | |
T = 400e-3 | |
dt = 10e-5 | |
A = 30 | |
t = np.arange(0, T, dt) | |
fig = plt.figure(figsize=(12, 12)) | |
n_trials = 100 | |
# ----------------- | |
# with phase locking | |
# ----------------- | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5, 'phase_shift': 1}] | |
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
sys.stderr.write('%d phase-locked trials' % n_trials) | |
phase_diffs, times, freqs = wavelet_plv(nX, nY, dt) | |
times, freqs = np.meshgrid(1000 * times, freqs) | |
# first 5 trials only | |
ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'frequency (Hz)') | |
plv = np.abs(np.mean(np.exp(1j * phase_diffs[:5]), axis=0)) | |
ax.contourf(times, freqs, plv, 100) | |
ax.set_ylim(0, 250) | |
# all trials | |
ax = setup_axis(fig.add_subplot(2, 2, 3), 'time (ms)', 'frequency (Hz)') | |
plv = np.abs(np.mean(np.exp(1j * phase_diffs), axis=0)) | |
ax.contourf(times, freqs, plv, 100) | |
ax.set_ylim(0, 250) | |
# ----------------- | |
# without phase locking | |
# ----------------- | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5}] | |
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
sys.stderr.write('%d non-phase-locked trials' % n_trials) | |
phase_diffs, times, freqs = wavelet_plv(nX, nY, dt) | |
times, freqs = np.meshgrid(1000 * times, freqs) | |
# first 5 trials only | |
ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'frequency (Hz)') | |
plv = np.abs(np.mean(np.exp(1j * phase_diffs[:5]), axis=0)) | |
ax.contourf(times, freqs, plv, 100) | |
ax.set_ylim(0, 250) | |
# all trials | |
ax = setup_axis(fig.add_subplot(2, 2, 4), 'time (ms)', 'frequency (Hz)') | |
plv = np.abs(np.mean(np.exp(1j * phase_diffs), axis=0)) | |
contours = ax.contourf(times, freqs, plv, 100) | |
ax.set_ylim(0, 250) | |
fig.subplots_adjust(right=0.8) | |
fig.colorbar(contours, cax=fig.add_axes([0.85, 0.35, 0.01, 0.3])) | |
fig.savefig(path, dpi=500) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_wavelet_coherence(path): | |
T = 400e-3 | |
dt = 10e-5 | |
A = 30 | |
t = np.arange(0, T, dt) | |
fig = plt.figure(figsize=(12, 12)) | |
n_trials = 100 | |
# ----------------- | |
# with phase locking | |
# ----------------- | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5, 'phase_shift': 1}] | |
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
sys.stderr.write('%d phase-locked trials' % n_trials) | |
s_xx, s_yy, s_xy, times, freqs = wavelet_coherence(nX, nY, dt) | |
times, freqs = np.meshgrid(1000 * times, freqs) | |
# first 5 trials only | |
ax = setup_axis(fig.add_subplot(2, 2, 1), 'time (ms)', 'frequency (Hz)') | |
coh = (np.abs(np.mean(s_xy[:5], axis=0)) ** 2) / (np.mean(s_xx[:5], axis=0) * np.mean(s_yy[:5], axis=0)) | |
ax.contourf(times, freqs, coh, 100) | |
ax.set_ylim(0, 250) | |
# all trials | |
ax = setup_axis(fig.add_subplot(2, 2, 3), 'time (ms)', 'frequency (Hz)') | |
coh = (np.abs(np.mean(s_xy, axis=0)) ** 2) / (np.mean(s_xx, axis=0) * np.mean(s_yy, axis=0)) | |
ax.contourf(times, freqs, coh, 100) | |
ax.set_ylim(0, 250) | |
# ----------------- | |
# without phase locking | |
# ----------------- | |
bumps = [{'time': (100e-3, 200e-3), 'freq': (30, 32), 'start_sd': 5e-3, 'amplitude': 5}] | |
nX, nY = two_channel_lfp_n(n_trials, t, dt, A, power_bumps=bumps) | |
sys.stderr.write('%d non-phase-locked trials' % n_trials) | |
s_xx, s_yy, s_xy, times, freqs = wavelet_coherence(nX, nY, dt) | |
times, freqs = np.meshgrid(1000 * times, freqs) | |
# first 5 trials only | |
ax = setup_axis(fig.add_subplot(2, 2, 2), 'time (ms)', 'frequency (Hz)') | |
coh = (np.abs(np.mean(s_xy[:5], axis=0)) ** 2) / (np.mean(s_xx[:5], axis=0) * np.mean(s_yy[:5], axis=0)) | |
ax.contourf(times, freqs, coh, 100) | |
ax.set_ylim(0, 250) | |
# all trials | |
ax = setup_axis(fig.add_subplot(2, 2, 4), 'time (ms)', 'frequency (Hz)') | |
coh = (np.abs(np.mean(s_xy, axis=0)) ** 2) / (np.mean(s_xx, axis=0) * np.mean(s_yy, axis=0)) | |
contours = ax.contourf(times, freqs, coh, 100) | |
ax.set_ylim(0, 250) | |
fig.subplots_adjust(right=0.8) | |
fig.colorbar(contours, cax=fig.add_axes([0.85, 0.35, 0.01, 0.3])) | |
fig.savefig(path, dpi=500) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_example_resultant(path): | |
def _plot_resultant(ax, n, phase_mean, phase_sd): | |
vectors = np.exp(1j * (phase_mean + phase_sd * np.random.randn(n))) | |
quiver_kw = { | |
'angles': 'xy', | |
'scale_units': 'xy', | |
'scale': 1 | |
} | |
ax.quiver(np.zeros(n), np.zeros(n), vectors.real, vectors.imag, | |
width=5e-3, alpha=.2, color='k', **quiver_kw) | |
resultant = np.mean(vectors) | |
ax.quiver(np.zeros(1), np.zeros(1), resultant.real, resultant.imag, | |
width=10e-3, alpha=.7, color='r', **quiver_kw) | |
ax.set_aspect('equal') | |
ax.set_xlim(-1.1, 1.1) | |
ax.set_ylim(-1.1, 1.1) | |
# draw the unit circle | |
t = np.linspace(0, 2 * np.pi, 1000) | |
ax.plot(np.cos(t), np.sin(t), c='b', lw=2, alpha=.2) | |
phase_mean = np.pi / 4. | |
n = 500 | |
fig = plt.figure(figsize=(12, 6)) | |
ax = setup_axis(fig.add_subplot(1, 2, 1), 'Re', 'Im') | |
_plot_resultant(ax, n, phase_mean, .06 * np.pi) | |
ax = setup_axis(fig.add_subplot(1, 2, 2), 'Re', 'Im') | |
_plot_resultant(ax, n, phase_mean, .6 * np.pi) | |
fig.savefig(path, dpi=300) | |
sys.stderr.write('saved plot to %s.\n' % path) | |
def plot_morlet_wavelets(path): | |
fig = plt.figure(figsize=(12, 8)) | |
ax_re = setup_axis(fig.add_subplot(2, 2, 1), 'time', 'Re\{$\psi(\\frac{t-\\tau}{s})$\}') | |
ax_im = setup_axis(fig.add_subplot(2, 2, 2), 'time', 'Im\{$\psi(\\frac{t-\\tau}{s})$\}') | |
t = np.linspace(-5, 5, 10000) | |
def wavelet(t, s, tau): | |
_t = (t - tau) / s | |
return np.pi ** -.25 * np.exp(-_t**2 / 2) * np.exp(-6j * _t) | |
w = wavelet(t, 1, 0) | |
ax_re.plot(t, w.real, 'k', lw=1, alpha=.9) | |
ax_im.plot(t, w.imag, 'k', lw=1, alpha=.9) | |
w = wavelet(t, 2, 0) | |
ax_re.plot(t, w.real, 'm', lw=4, alpha=.2) | |
ax_im.plot(t, w.imag, 'm', lw=4, alpha=.2) | |
ax_re = setup_axis(fig.add_subplot(2, 2, 3), 'time', 'Re\{$\psi(\\frac{t-\\tau}{s})$\}') | |
ax_im = setup_axis(fig.add_subplot(2, 2, 4), 'time', 'Im\{$\psi(\\frac{t-\\tau}{s})$\}') | |
w = wavelet(t, 1, 0) | |
ax_re.plot(t, w.real, 'k', lw=1, alpha=.9) | |
ax_im.plot(t, w.imag, 'k', lw=1, alpha=.9) | |
w = wavelet(t, 1, 2) | |
ax_re.plot(t, w.real, 'm', lw=4, alpha=.2) | |
ax_im.plot(t, w.imag, 'm', lw=4, alpha=.2) | |
fig.savefig(path, dpi=500) | |
if __name__ == '__main__': | |
plot_sample_lfp('lfp.png') | |
plot_stationarity_measures('stationarity.png') | |
plot_ar_stability_measure('AR-stability-index.png') | |
plot_coherence_measures('coherence.png') | |
plot_wavelet_plv('wavelet-plv.png') | |
plot_wavelet_coherence('wavelet-coherence.png') | |
plot_example_resultant('resultant.png') | |
plot_morlet('morlet.png') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment