Created
September 9, 2021 14:24
-
-
Save kingjr/8738cb27e60398d49e7b1302a4b1fa10 to your computer and use it in GitHub Desktop.
svetlana.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# To add a new cell, type '# %%' | |
# To add a new markdown cell, type '# %% [markdown]' | |
# %% | |
#%matplotlib inline | |
# pip install python-levenshtein | |
import mne | |
import pandas as pd | |
import numpy as np | |
from scipy.io import loadmat | |
import matplotlib.pyplot as plt | |
# %% | |
def get_struct(struct): | |
keys = list(struct.dtype.fields.keys()) | |
out = dict() | |
for k in keys: | |
out[k] = struct[k].item() | |
return out | |
def get_df(struct): | |
keys = list(struct.dtype.fields.keys()) | |
out = list() | |
for t in struct: | |
out.append(dict()) | |
for i, k in enumerate(keys): | |
out[-1][k] = t[i] | |
return pd.DataFrame(out) | |
def ascii_to_letter(x): | |
if pd.isna(x): | |
return x | |
return chr(int(x)) | |
def read_log(log_fname): | |
mat = loadmat( | |
log_fname, | |
squeeze_me=True, | |
struct_as_record=True, | |
chars_as_strings=True, | |
) | |
struct = get_struct(mat["pr_trials"]) | |
df = list() | |
for event_type in ("key", "rsvp"): | |
for trial_id, trial in enumerate(struct[event_type]): | |
if trial is None: | |
continue | |
trial = get_df(trial) | |
trial["trial_id"] = trial_id | |
trial["event_type"] = "image" if event_type == "rsvp" else "key" | |
df.append(trial) | |
df = pd.concat(df, ignore_index=True) | |
# preproc | |
idx = df.query('event_type=="image"').index | |
df.loc[idx, "trigger"] = 10 | |
df.loc[idx, "Time"] = df.loc[idx, "t"] | |
if "Keycode" in df.keys(): | |
idx = df.query('event_type=="key"').index | |
df.loc[idx, "trigger"] = df.loc[idx, "Keycode"] | |
df.loc[idx, "key"] = df.loc[idx, "Keycode"].apply(ascii_to_letter) | |
else: | |
df["key"] = None | |
df["Pressed"] = None | |
df = df.sort_values("Time").reset_index() | |
df["time"] = df["Time"] | |
df["pressed"] = df["Pressed"].apply(bool) | |
df["is_image"] = df.event_type == "image" | |
df["is_key"] = df.event_type == "key" | |
df = df[ | |
["trial_id", "time", "pressed", "key", "trigger", "is_image", "is_key"] | |
] | |
return df | |
# %% | |
log8 = read_log("S1108.mat") | |
log8["block_id"] = 0 | |
log9 = read_log("S1109.mat") | |
log9["block_id"] = 1 | |
log = pd.concat([log8, log9], ignore_index=True) | |
# %% | |
raw_fname = "sveta_test_fixed_allevents.fif" | |
raw = mne.io.read_raw_fif(raw_fname) | |
events = mne.find_events(raw, min_duration=0.005) | |
plt.plot(events[:, 2]) | |
# %% | |
from Levenshtein import editops | |
def match_list(A, B, on_replace="delete"): | |
"""Match two lists of different sizes and return corresponding indice | |
Parameters | |
---------- | |
A: list | array, shape (n,) | |
The values of the first list | |
B: list | array: shape (m, ) | |
The values of the second list | |
Returns | |
------- | |
A_idx : array | |
The indices of the A list that match those of the B | |
B_idx : array | |
The indices of the B list that match those of the A | |
""" | |
unique = np.unique(np.r_[A, B]) | |
label_encoder = dict((k, v) for v, k in enumerate(unique)) | |
def int_to_unicode(array: np.ndarray) -> str: | |
return "".join([str(chr(label_encoder[ii])) for ii in array]) | |
changes = editops(int_to_unicode(A), int_to_unicode(B)) | |
B_sel = np.arange(len(B)).astype(float) | |
A_sel = np.arange(len(A)).astype(float) | |
for type_, val_a, val_b in changes: | |
if type_ == "insert": | |
B_sel[val_b] = np.nan | |
elif type_ == "delete": | |
A_sel[val_a] = np.nan | |
elif on_replace == "delete": | |
# print('delete replace') | |
A_sel[val_a] = np.nan | |
B_sel[val_b] = np.nan | |
elif on_replace == "keep": | |
# print('keep replace') | |
pass | |
else: | |
raise NotImplementedError | |
B_sel = B_sel[np.where(~np.isnan(B_sel))] | |
A_sel = A_sel[np.where(~np.isnan(A_sel))] | |
assert len(B_sel) == len(A_sel) | |
return A_sel.astype(int), B_sel.astype(int) | |
# %% | |
log = log.query("is_image or pressed") | |
len(events), len(log) | |
# %% | |
i, j = match_list(events[:, 2], log.trigger.values) | |
missed = np.setdiff1d(np.arange(len(log)), j) | |
print(f"{missed} missed events") | |
# %% | |
plt.plot(log.trigger.values, lw=1) | |
valid = np.zeros(len(log)) | |
valid[j] = 1 | |
plt.scatter(range(len(valid)), valid * log.trigger.values, color="r", s=10) | |
invalid = np.zeros(len(log)) | |
invalid[missed] = 1 | |
plt.scatter(range(len(invalid)), invalid * log.trigger.values, color="k", s=10) | |
# plt.xlim(0, 500) | |
# %% | |
raw.load_data() | |
raw.filter(1.0, 20.0, n_jobs=-1) | |
# %% | |
log | |
# %% | |
events | |
# %% | |
epochs = mne.Epochs( | |
raw, events=events, metadata=log, decim=10, tmin=-0.500, tmax=0.7 | |
) | |
# %% | |
epochs['is_key and key=="K"'].get_data().shape | |
# %% | |
def clip_meg(epo, thresh=5.0): | |
epo.load_data() | |
epo = epo.pick_types(eeg=False, meg="mag") | |
X = epo.get_data() | |
# clip data | |
thresholds = np.percentile( | |
X.transpose(1, 0, 2).reshape(len(epo.ch_names), -1), | |
[thresh, 100.0 - thresh], | |
axis=1, | |
) | |
for ch, thresh in enumerate(thresholds.T): | |
X[:, ch, :] = np.clip(X[:, ch, :], *thresh) | |
epo._data[:] = X | |
return epo | |
# %% | |
epochs = clip_meg(epochs) | |
# %% | |
evoked = epochs["is_image"].average() | |
evoked.plot_joint( | |
times=np.linspace(0, 0.3, 5), topomap_args=dict(vmin=-100, vmax=100) | |
) | |
evoked = epochs["is_key"].average() | |
evoked.plot_joint( | |
times=np.linspace(0, 0.3, 5), topomap_args=dict(vmin=-10, vmax=10) | |
) | |
# %% | |
# %% | |
X = epo.get_data() * 1e13 | |
X -= np.median(X, 0) | |
X /= np.std(X, 0) | |
# %% | |
is_left_key = epo.metadata.key.apply(lambda k: k.lower() in "qwertyasdfgzxcv") | |
y = is_left_key.values * 1.0 | |
# %% | |
from sklearn.linear_model import LogisticRegression | |
from scipy.stats import pearsonr | |
from sklearn.model_selection import KFold, cross_val_predict, cross_val_score | |
from tqdm.notebook import trange | |
model = LogisticRegression() | |
n_times = X.shape[2] | |
auc = np.zeros(n_times) | |
for t in trange(n_times): | |
auc[t] = np.mean(cross_val_score(model, X[:, :, t], y, scoring="roc_auc")) | |
# %% | |
plt.plot(epo.times, auc) | |
# %% | |
var = "svetlana" | |
for i, c in enumerate(var): | |
print(i, c) | |
# %% | |
from sklearn.metrics import roc_auc_score | |
# %% | |
y = is_left_key.values * 1.0 | |
# %% | |
y_before = np.roll(y, -1) | |
# %% | |
plt.plot(y_before[:10], label="before") | |
plt.plot(y[:10]) | |
plt.legend() | |
# %% | |
# %% | |
from sklearn.linear_model import RidgeClassifier | |
model = RidgeClassifier() | |
cv = KFold(n_splits=4, shuffle=True) | |
n_times = X.shape[2] | |
auc = np.zeros((cv.n_splits, n_times, n_times)) | |
for train_time in trange(n_times): | |
for split, (train_trial, test_trial) in enumerate(cv.split(X, y)): | |
# fit model | |
model.fit(X=X[train_trial, :, train_time], y=y[train_trial]) | |
for test_time in range(n_times): | |
# make prediction on test set | |
y_pred = model.predict(X=X[test_trial, :, test_time]) | |
# evaluate "accuracy" | |
score = roc_auc_score(y[test_trial], y_pred) | |
# store the result | |
auc[split, train_time, test_time] = score | |
# %% | |
plt.matshow( | |
auc.mean(0), | |
vmin=0.4, | |
vmax=0.6, | |
cmap="RdBu_r", | |
origin="lower", | |
extent=epo.times[[0, -1, 0, -1]], | |
) | |
plt.axhline(0, color="k", lw=1) | |
plt.axvline(0, color="k", lw=1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment