Created
June 8, 2015 15:04
-
-
Save kingjr/1e7e59907b11f334b3d5 to your computer and use it in GitHub Desktop.
decoding_source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| ============================================================= | |
| Decoding source space data with ROI - sliding window approach | |
| ============================================================= | |
| Decoding, a.k.a MVPA or supervised machine learning applied to MEG | |
| data in source space on the left cortical surface. Here f-test feature | |
| selection is employed to confine the classification to the potentially | |
| relevant features. The classifier then is trained to selected features of | |
| epochs in source space. A different classifier is trained i) in each | |
| region of interest (ROI), based on Broadmann cytoarchitectonic mapping and | |
| ii) in each time point separately. The figure then show the score (and) | |
| not the weights of each classifiers across time and space | |
| """ | |
| # Author: Jean-Rémi King <jeanremi.king@gmail.com> | |
| print __doc__ | |
| # generic libraries | |
| import os | |
| import numpy as np | |
| import mne | |
| from mne import fiff | |
| from mne.datasets import sample | |
| from mne.minimum_norm import apply_inverse_epochs, read_inverse_operator | |
| # setup subject's details | |
| data_path = sample.data_path() | |
| fname_fwd = data_path + 'MEG/sample/sample_audvis-meg-oct-6-fwd.fif' | |
| fname_evoked = data_path + '/MEG/sample/sample_audvis-ave.fif' | |
| subjects_dir = data_path + '/subjects' | |
| subject = os.environ['SUBJECT'] = subjects_dir + '/sample' | |
| os.environ['SUBJECTS_DIR'] = subjects_dir | |
| raw_fname = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif' | |
| event_fname = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw-eve.fif' | |
| fname_cov = data_path + '/MEG/sample/sample_audvis-cov.fif' | |
| label_names = 'Aud-rh', 'Vis-rh' | |
| fname_inv = data_path + '/MEG/sample/sample_audvis-meg-oct-6-meg-inv.fif' | |
| # load & preprocess MEG data | |
| tmin, tmax = -0.2, 0.5 | |
| event_id = dict(aud_r=2, vis_r=4) # load contra-lateral conditions | |
| raw = fiff.Raw(raw_fname, preload=True) # setup for reading the raw data | |
| raw.filter(2, None, method='iir') # replace baselining with high-pass | |
| events = mne.read_events(event_fname) | |
| raw.info['bads'] += ['MEG 2443'] # Set up pick list: MEG - bad channels | |
| picks = fiff.pick_types(raw.info, meg=True, eeg=False, stim=True, eog=True, | |
| exclude='bads') | |
| epochs = mne.Epochs(raw, events, event_id, tmin, tmax, proj=True, | |
| picks=picks, baseline=None, preload=True, | |
| reject=dict(grad=4000e-13, eog=150e-6)) # Read epochs | |
| epochs.equalize_event_counts(event_id.keys(), 'mintime', copy=False) | |
| epochs_list = [epochs[k] for k in event_id] | |
| # Compute inverse solution | |
| snr = 3.0 | |
| lambda2 = 1.0 / snr ** 2 | |
| method = "MNE" # check that it is not dSPM only??? | |
| n_times = len(epochs.times) | |
| n_vertices = 3732 | |
| n_epochs = len(epochs.events) | |
| noise_cov = mne.read_cov(fname_cov) # Load precomputed covariance data | |
| inverse_operator = read_inverse_operator(fname_inv) # compute inverse solution and stcs for each epoch. | |
| # prepare data for sklearn | |
| X = np.zeros([n_epochs, n_vertices, n_times]) | |
| for condition_count, ep in zip([0, n_epochs / 2], epochs_list): | |
| stcs = apply_inverse_epochs(ep, inverse_operator, lambda2, | |
| method, pick_ori="normal", # this saves memory | |
| return_generator=True) | |
| for jj, stc in enumerate(stcs): | |
| X[condition_count + jj] = stc.lh_data | |
| X = X.reshape(n_epochs, n_vertices, n_times) # separate the time dimension to apply a sliding window | |
| y = np.repeat([0, 1], n_epochs / 2) # define trials | |
| # define regions of interest | |
| import nibabel as nib | |
| labels, ctab, names = nib.freesurfer.read_annot(subject+'/label/lh.aparc.annot') | |
| areas = np.unique(labels) # get areas ids | |
| n_areas = len(areas) | |
| # setup classification pipeline | |
| from sklearn.svm import SVC | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.cross_validation import StratifiedKFold | |
| from sklearn.feature_selection import SelectKBest, f_classif | |
| from sklearn.pipeline import Pipeline | |
| scaler = StandardScaler() | |
| Kbest = 100 | |
| clf = SVC(C=1, kernel='linear') | |
| anova = SelectKBest(f_classif, k=Kbest) # homogeneize number of dipole per region to get comparable problem space | |
| # cross validation | |
| n_folds = 4 # for speed | |
| cv = StratifiedKFold(y, n_folds=n_folds) | |
| # initialize results | |
| scores = np.zeros([n_folds,n_times,n_areas]) | |
| feature_scores = np.ones([3732,n_times])*.5 | |
| # classification: loop across time and space | |
| for t in arange(n_times): | |
| print t | |
| for a in arange(n_areas): | |
| roi = labels==areas[a] # select region of interest | |
| roi = roi[stc.lh_vertno] # only computed on a subsampling of sources | |
| if sum(roi)>Kbest: | |
| svc = Pipeline([ | |
| ('scaler', scaler), | |
| ('feature_selection', anova), | |
| ('svc', clf)]) | |
| else: # this is a bit dirty; it'd be much better if we could add an option to allow SelectKBest to take Kbest if more than K features | |
| svc = Pipeline([ | |
| ('scaler', scaler), | |
| ('svc', clf)]) | |
| X_roi = X[:,roi,t] | |
| for ii, (train, test) in enumerate(cv): | |
| svc.fit(X_roi[train], y[train]) | |
| y_pred = svc.predict(X_roi[test]) | |
| y_test = y[test] | |
| scores[ii,t,a] = np.mean(y_pred == y_test) | |
| feature_scores[roi,t] = np.mean(scores[:,t,a]) | |
| # plot score per region | |
| vertices = [stc.lh_vertno, np.array([])] # empty array for right hemisphere | |
| stc_feat = mne.SourceEstimate(feature_scores*100, vertices=vertices, | |
| tmin=stc.tmin, tstep=stc.tstep, | |
| subject='sample') | |
| brain = stc_feat.plot(subject=subject, fmin=50, fmid=75, fmax=100, | |
| config_opts={"cortex": "low_contrast", "background": "ivory"}) | |
| # The color function does not accept float values?... | |
| brain.set_time(130) | |
| brain.show_view('ventral') | |
| fig = figure() | |
| imshow(np.mean(scores[:,:,:],axis=0).transpose(), aspect='auto',vmin=0,vmax=1) | |
| yticks(arange(n_areas),names) | |
| show() | |
| # Compare multivariate decoding with a univariate clustering approach | |
| # note that ideally you would use an F test across all sources with a | |
| # cluster permutation statistical analysis. However, the present | |
| # approach is more easily comparable with the decoding one. | |
| feature_uni = np.ones([3732,n_times])*.5 | |
| scores_uni = np.zeros([n_folds,n_times,n_areas]) | |
| for t in arange(n_times): | |
| print t | |
| for a in arange(n_areas): | |
| roi = labels==areas[a] # select region of interest | |
| roi = roi[stc.lh_vertno] # only computed on a subsampling of sources | |
| svc = Pipeline([ | |
| ('scaler', scaler), | |
| ('svc', clf)]) | |
| X_roi = np.mean(X[:,roi,t],axis=1).reshape([len(y),1]) | |
| for ii, (train, test) in enumerate(cv): | |
| svc.fit(X_roi[train], y[train]) | |
| y_pred = svc.predict(X_roi[test]) | |
| y_test = y[test] | |
| scores_uni[ii,t,a] = np.mean(y_pred == y_test) | |
| fig = figure() | |
| imshow(np.mean(scores_uni[:,:,:],axis=0).transpose(), aspect='auto',vmin=0,vmax=1) | |
| yticks(arange(n_areas),names) | |
| show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment