Skip to content

Instantly share code, notes, and snippets.

@kingjr
Created September 9, 2021 14:24
Show Gist options
  • Save kingjr/8738cb27e60398d49e7b1302a4b1fa10 to your computer and use it in GitHub Desktop.
Save kingjr/8738cb27e60398d49e7b1302a4b1fa10 to your computer and use it in GitHub Desktop.
svetlana.py
# To add a new cell, type '# %%'
# To add a new markdown cell, type '# %% [markdown]'
# %%
#%matplotlib inline
# pip install python-levenshtein
import mne
import pandas as pd
import numpy as np
from scipy.io import loadmat
import matplotlib.pyplot as plt
# %%
def get_struct(struct):
keys = list(struct.dtype.fields.keys())
out = dict()
for k in keys:
out[k] = struct[k].item()
return out
def get_df(struct):
keys = list(struct.dtype.fields.keys())
out = list()
for t in struct:
out.append(dict())
for i, k in enumerate(keys):
out[-1][k] = t[i]
return pd.DataFrame(out)
def ascii_to_letter(x):
if pd.isna(x):
return x
return chr(int(x))
def read_log(log_fname):
mat = loadmat(
log_fname,
squeeze_me=True,
struct_as_record=True,
chars_as_strings=True,
)
struct = get_struct(mat["pr_trials"])
df = list()
for event_type in ("key", "rsvp"):
for trial_id, trial in enumerate(struct[event_type]):
if trial is None:
continue
trial = get_df(trial)
trial["trial_id"] = trial_id
trial["event_type"] = "image" if event_type == "rsvp" else "key"
df.append(trial)
df = pd.concat(df, ignore_index=True)
# preproc
idx = df.query('event_type=="image"').index
df.loc[idx, "trigger"] = 10
df.loc[idx, "Time"] = df.loc[idx, "t"]
if "Keycode" in df.keys():
idx = df.query('event_type=="key"').index
df.loc[idx, "trigger"] = df.loc[idx, "Keycode"]
df.loc[idx, "key"] = df.loc[idx, "Keycode"].apply(ascii_to_letter)
else:
df["key"] = None
df["Pressed"] = None
df = df.sort_values("Time").reset_index()
df["time"] = df["Time"]
df["pressed"] = df["Pressed"].apply(bool)
df["is_image"] = df.event_type == "image"
df["is_key"] = df.event_type == "key"
df = df[
["trial_id", "time", "pressed", "key", "trigger", "is_image", "is_key"]
]
return df
# %%
log8 = read_log("S1108.mat")
log8["block_id"] = 0
log9 = read_log("S1109.mat")
log9["block_id"] = 1
log = pd.concat([log8, log9], ignore_index=True)
# %%
raw_fname = "sveta_test_fixed_allevents.fif"
raw = mne.io.read_raw_fif(raw_fname)
events = mne.find_events(raw, min_duration=0.005)
plt.plot(events[:, 2])
# %%
from Levenshtein import editops
def match_list(A, B, on_replace="delete"):
"""Match two lists of different sizes and return corresponding indice
Parameters
----------
A: list | array, shape (n,)
The values of the first list
B: list | array: shape (m, )
The values of the second list
Returns
-------
A_idx : array
The indices of the A list that match those of the B
B_idx : array
The indices of the B list that match those of the A
"""
unique = np.unique(np.r_[A, B])
label_encoder = dict((k, v) for v, k in enumerate(unique))
def int_to_unicode(array: np.ndarray) -> str:
return "".join([str(chr(label_encoder[ii])) for ii in array])
changes = editops(int_to_unicode(A), int_to_unicode(B))
B_sel = np.arange(len(B)).astype(float)
A_sel = np.arange(len(A)).astype(float)
for type_, val_a, val_b in changes:
if type_ == "insert":
B_sel[val_b] = np.nan
elif type_ == "delete":
A_sel[val_a] = np.nan
elif on_replace == "delete":
# print('delete replace')
A_sel[val_a] = np.nan
B_sel[val_b] = np.nan
elif on_replace == "keep":
# print('keep replace')
pass
else:
raise NotImplementedError
B_sel = B_sel[np.where(~np.isnan(B_sel))]
A_sel = A_sel[np.where(~np.isnan(A_sel))]
assert len(B_sel) == len(A_sel)
return A_sel.astype(int), B_sel.astype(int)
# %%
log = log.query("is_image or pressed")
len(events), len(log)
# %%
i, j = match_list(events[:, 2], log.trigger.values)
missed = np.setdiff1d(np.arange(len(log)), j)
print(f"{missed} missed events")
# %%
plt.plot(log.trigger.values, lw=1)
valid = np.zeros(len(log))
valid[j] = 1
plt.scatter(range(len(valid)), valid * log.trigger.values, color="r", s=10)
invalid = np.zeros(len(log))
invalid[missed] = 1
plt.scatter(range(len(invalid)), invalid * log.trigger.values, color="k", s=10)
# plt.xlim(0, 500)
# %%
raw.load_data()
raw.filter(1.0, 20.0, n_jobs=-1)
# %%
log
# %%
events
# %%
epochs = mne.Epochs(
raw, events=events, metadata=log, decim=10, tmin=-0.500, tmax=0.7
)
# %%
epochs['is_key and key=="K"'].get_data().shape
# %%
def clip_meg(epo, thresh=5.0):
epo.load_data()
epo = epo.pick_types(eeg=False, meg="mag")
X = epo.get_data()
# clip data
thresholds = np.percentile(
X.transpose(1, 0, 2).reshape(len(epo.ch_names), -1),
[thresh, 100.0 - thresh],
axis=1,
)
for ch, thresh in enumerate(thresholds.T):
X[:, ch, :] = np.clip(X[:, ch, :], *thresh)
epo._data[:] = X
return epo
# %%
epochs = clip_meg(epochs)
# %%
evoked = epochs["is_image"].average()
evoked.plot_joint(
times=np.linspace(0, 0.3, 5), topomap_args=dict(vmin=-100, vmax=100)
)
evoked = epochs["is_key"].average()
evoked.plot_joint(
times=np.linspace(0, 0.3, 5), topomap_args=dict(vmin=-10, vmax=10)
)
# %%
# %%
X = epo.get_data() * 1e13
X -= np.median(X, 0)
X /= np.std(X, 0)
# %%
is_left_key = epo.metadata.key.apply(lambda k: k.lower() in "qwertyasdfgzxcv")
y = is_left_key.values * 1.0
# %%
from sklearn.linear_model import LogisticRegression
from scipy.stats import pearsonr
from sklearn.model_selection import KFold, cross_val_predict, cross_val_score
from tqdm.notebook import trange
model = LogisticRegression()
n_times = X.shape[2]
auc = np.zeros(n_times)
for t in trange(n_times):
auc[t] = np.mean(cross_val_score(model, X[:, :, t], y, scoring="roc_auc"))
# %%
plt.plot(epo.times, auc)
# %%
var = "svetlana"
for i, c in enumerate(var):
print(i, c)
# %%
from sklearn.metrics import roc_auc_score
# %%
y = is_left_key.values * 1.0
# %%
y_before = np.roll(y, -1)
# %%
plt.plot(y_before[:10], label="before")
plt.plot(y[:10])
plt.legend()
# %%
# %%
from sklearn.linear_model import RidgeClassifier
model = RidgeClassifier()
cv = KFold(n_splits=4, shuffle=True)
n_times = X.shape[2]
auc = np.zeros((cv.n_splits, n_times, n_times))
for train_time in trange(n_times):
for split, (train_trial, test_trial) in enumerate(cv.split(X, y)):
# fit model
model.fit(X=X[train_trial, :, train_time], y=y[train_trial])
for test_time in range(n_times):
# make prediction on test set
y_pred = model.predict(X=X[test_trial, :, test_time])
# evaluate "accuracy"
score = roc_auc_score(y[test_trial], y_pred)
# store the result
auc[split, train_time, test_time] = score
# %%
plt.matshow(
auc.mean(0),
vmin=0.4,
vmax=0.6,
cmap="RdBu_r",
origin="lower",
extent=epo.times[[0, -1, 0, -1]],
)
plt.axhline(0, color="k", lw=1)
plt.axvline(0, color="k", lw=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment