Last active
November 7, 2019 21:12
-
-
Save alexrockhill/eda1562e8ed22b933cab730c9e9a5851 to your computer and use it in GitHub Desktop.
mne_bids.write with convert
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test the MNE BIDS converter. | |
For each supported file format, implement a test. | |
""" | |
# Authors: Mainak Jas <[email protected]> | |
# Teon L Brooks <[email protected]> | |
# Chris Holdgraf <[email protected]> | |
# Stefan Appelhoff <[email protected]> | |
# Matt Sanderson <[email protected]> | |
# | |
# License: BSD (3-clause) | |
import os | |
import os.path as op | |
import pytest | |
from glob import glob | |
from datetime import datetime | |
import platform | |
import shutil as sh | |
import json | |
from distutils.version import LooseVersion | |
import numpy as np | |
from numpy.testing import assert_array_equal, assert_array_almost_equal | |
import mne | |
from mne.datasets import testing | |
from mne.utils import (_TempDir, run_subprocess, check_version, | |
requires_nibabel, requires_version) | |
from mne.io.constants import FIFF | |
from mne.io.kit.kit import get_kit_info | |
from mne_bids import (write_raw_bids, read_raw_bids, make_bids_basename, | |
make_bids_folders, write_anat) | |
from mne_bids.tsv_handler import _from_tsv, _to_tsv | |
from mne_bids.utils import _find_matching_sidecar | |
from mne_bids.pick import coil_type | |
base_path = op.join(op.dirname(mne.__file__), 'io') | |
subject_id = '01' | |
subject_id2 = '02' | |
session_id = '01' | |
run = '01' | |
acq = '01' | |
run2 = '02' | |
task = 'testing' | |
bids_basename = make_bids_basename( | |
subject=subject_id, session=session_id, run=run, acquisition=acq, | |
task=task) | |
bids_basename_minimal = make_bids_basename(subject=subject_id, task=task) | |
# WINDOWS issues: | |
# the bids-validator development version does not work properly on Windows as | |
# of 2019-06-25 --> https://github.com/bids-standard/bids-validator/issues/790 | |
# As a workaround, we try to get the path to the executable from an environment | |
# variable VALIDATOR_EXECUTABLE ... if this is not possible we assume to be | |
# using the stable bids-validator and make a direct call of bids-validator | |
# also: for windows, shell = True is needed to call npm, bids-validator etc. | |
# see: https://stackoverflow.com/q/28891053/5201771 | |
@pytest.fixture(scope="session") | |
def _bids_validate(): | |
"""Fixture to run BIDS validator.""" | |
shell = False | |
bids_validator_exe = ['bids-validator', '--config.error=41', | |
'--config.error=41'] | |
if platform.system() == 'Windows': | |
shell = True | |
exe = os.getenv('VALIDATOR_EXECUTABLE', 'n/a') | |
if 'VALIDATOR_EXECUTABLE' != 'n/a': | |
bids_validator_exe = ['node', exe] | |
def _validate(output_path): | |
cmd = bids_validator_exe + [output_path] | |
run_subprocess(cmd, shell=shell) | |
return _validate | |
@requires_version('pybv', '0.2.0') | |
def _test_anonymize(raw, bids_basename, events_fname=None, event_id=None): | |
output_path = _TempDir() | |
write_raw_bids(raw, bids_basename, output_path, | |
events_data=events_fname, | |
event_id=event_id, anonymize=dict(daysback=600), | |
overwrite=False) | |
scans_tsv = make_bids_basename( | |
subject=subject_id, session=session_id, suffix='scans.tsv', | |
prefix=op.join(output_path, 'sub-01', 'ses-01')) | |
data = _from_tsv(scans_tsv) | |
assert datetime.strptime(data['acq_time'][0], | |
'%Y-%m-%dT%H:%M:%S').year < 1925 | |
return output_path | |
@requires_version('pybv', '0.2.0') | |
def _test_convert(raw, bids_basename, events_fname=None, event_id=None): | |
output_path = _TempDir() | |
with pytest.warns(UserWarning, match='Converting data files to ' + | |
'BrainVision format'): | |
write_raw_bids(raw, bids_basename, output_path, | |
events_data=events_fname, | |
event_id=event_id, convert=True, | |
overwrite=False, verbose=True) | |
return output_path | |
@requires_version('pybv', '0.2.0') | |
def test_fif(_bids_validate): | |
"""Test functionality of the write_raw_bids conversion for fif.""" | |
output_path = _TempDir() | |
data_path = testing.data_path() | |
raw_fname = op.join(data_path, 'MEG', 'sample', | |
'sample_audvis_trunc_raw.fif') | |
event_id = {'Auditory/Left': 1, 'Auditory/Right': 2, 'Visual/Left': 3, | |
'Visual/Right': 4, 'Smiley': 5, 'Button': 32} | |
events_fname = op.join(data_path, 'MEG', 'sample', | |
'sample_audvis_trunc_raw-eve.fif') | |
raw = mne.io.read_raw_fif(raw_fname) | |
write_raw_bids(raw, bids_basename, output_path, events_data=events_fname, | |
event_id=event_id, overwrite=False) | |
# Read the file back in to check that the data has come through cleanly. | |
# Events and bad channel information was read through JSON sidecar files. | |
raw2 = read_raw_bids(bids_basename + '_meg.fif', output_path) | |
assert set(raw.info['bads']) == set(raw2.info['bads']) | |
events, _ = mne.events_from_annotations(raw2) | |
events2 = mne.read_events(events_fname) | |
events2 = events2[events2[:, 2] != 0] | |
assert_array_equal(events2[:, 0], events[:, 0]) | |
# check if write_raw_bids works when there is no stim channel | |
raw.set_channel_types({raw.ch_names[i]: 'misc' | |
for i in | |
mne.pick_types(raw.info, stim=True, meg=False)}) | |
output_path = _TempDir() | |
with pytest.warns(UserWarning, match='No events found or provided.'): | |
write_raw_bids(raw, bids_basename, output_path, overwrite=False) | |
_bids_validate(output_path) | |
# try with eeg data only (conversion to bv) | |
output_path = _TempDir() | |
raw = mne.io.read_raw_fif(raw_fname) | |
raw.load_data() | |
raw2 = raw.pick_types(meg=False, eeg=True, stim=True, eog=True, ecg=True) | |
raw2.save(op.join(output_path, 'test-raw.fif'), overwrite=True) | |
raw2 = mne.io.Raw(op.join(output_path, 'test-raw.fif'), preload=False) | |
with pytest.warns(UserWarning, | |
match='Converting data files to BrainVision format'): | |
write_raw_bids(raw2, bids_basename, output_path, | |
events_data=events_fname, event_id=event_id, | |
verbose=True, overwrite=False) | |
os.remove(op.join(output_path, 'test-raw.fif')) | |
bids_dir = op.join(output_path, 'sub-%s' % subject_id, | |
'ses-%s' % session_id, 'eeg') | |
for sidecar in ['channels.tsv', 'eeg.eeg', 'eeg.json', 'eeg.vhdr', | |
'eeg.vmrk', 'events.tsv']: | |
assert op.isfile(op.join(bids_dir, bids_basename + '_' + sidecar)) | |
raw2 = mne.io.read_raw_brainvision(op.join(bids_dir, | |
bids_basename + '_eeg.vhdr')) | |
assert_array_almost_equal(raw.get_data(), raw2.get_data()) | |
_bids_validate(output_path) | |
# write the same data but pretend it is empty room data: | |
raw = mne.io.read_raw_fif(raw_fname) | |
er_date = datetime.fromtimestamp( | |
raw.info['meas_date'][0]).strftime('%Y%m%d') | |
er_bids_basename = 'sub-emptyroom_ses-{0}_task-noise'.format(str(er_date)) | |
write_raw_bids(raw, er_bids_basename, output_path, overwrite=False) | |
assert op.exists(op.join( | |
output_path, 'sub-emptyroom', 'ses-{0}'.format(er_date), 'meg', | |
'sub-emptyroom_ses-{0}_task-noise_meg.json'.format(er_date))) | |
# test that an incorrect date raises an error. | |
er_bids_basename_bad = 'sub-emptyroom_ses-19000101_task-noise' | |
with pytest.raises(ValueError, match='Date provided'): | |
write_raw_bids(raw, er_bids_basename_bad, output_path, overwrite=False) | |
# give the raw object some fake participant data (potentially overwriting) | |
raw = mne.io.read_raw_fif(raw_fname) | |
raw.info['subject_info'] = {'his_id': subject_id2, | |
'birthday': (1993, 1, 26), 'sex': 1} | |
write_raw_bids(raw, bids_basename, output_path, events_data=events_fname, | |
event_id=event_id, overwrite=True) | |
# assert age of participant is correct | |
participants_tsv = op.join(output_path, 'participants.tsv') | |
data = _from_tsv(participants_tsv) | |
assert data['age'][data['participant_id'].index('sub-01')] == '9' | |
# try and write preloaded data | |
raw = mne.io.read_raw_fif(raw_fname, preload=True) | |
with pytest.raises(ValueError, match='preloaded'): | |
write_raw_bids(raw, bids_basename, output_path, | |
events_data=events_fname, event_id=event_id, | |
overwrite=False) | |
# test anonymize | |
raw = mne.io.read_raw_fif(raw_fname) | |
raw.anonymize() | |
data_path2 = _TempDir() | |
raw_fname2 = op.join(data_path2, 'sample_audvis_raw.fif') | |
raw.save(raw_fname2) | |
bids_basename2 = bids_basename.replace(subject_id, subject_id2) | |
raw = mne.io.read_raw_fif(raw_fname2) | |
bids_output_path = write_raw_bids(raw, bids_basename2, output_path, | |
events_data=events_fname, | |
event_id=event_id, overwrite=False) | |
# check that the overwrite parameters work correctly for the participant | |
# data | |
# change the gender but don't force overwrite. | |
raw.info['subject_info'] = {'his_id': subject_id2, | |
'birthday': (1994, 1, 26), 'sex': 2} | |
with pytest.raises(FileExistsError, match="already exists"): # noqa: F821 | |
write_raw_bids(raw, bids_basename2, output_path, | |
events_data=events_fname, event_id=event_id, | |
overwrite=False) | |
# now force the overwrite | |
write_raw_bids(raw, bids_basename2, output_path, events_data=events_fname, | |
event_id=event_id, overwrite=True) | |
with pytest.raises(ValueError, match='raw_file must be'): | |
write_raw_bids('blah', bids_basename, output_path) | |
bids_basename2 = 'sub-01_ses-01_xyz-01_run-01' | |
with pytest.raises(KeyError, match='Unexpected entity'): | |
write_raw_bids(raw, bids_basename2, output_path) | |
bids_basename2 = 'sub-01_run-01_task-auditory' | |
with pytest.raises(ValueError, match='ordered correctly'): | |
write_raw_bids(raw, bids_basename2, output_path, overwrite=True) | |
del raw._filenames | |
with pytest.raises(ValueError, match='raw.filenames is missing'): | |
write_raw_bids(raw, bids_basename2, output_path) | |
_bids_validate(output_path) | |
assert op.exists(op.join(output_path, 'participants.tsv')) | |
# asserting that single fif files do not include the part key | |
files = glob(op.join(bids_output_path, 'sub-' + subject_id2, | |
'ses-' + subject_id2, 'meg', '*.fif')) | |
for ii, FILE in enumerate(files): | |
assert 'part' not in FILE | |
assert ii < 1 | |
# test keyword mne-bids anonymize | |
raw = mne.io.read_raw_fif(raw_fname) | |
with pytest.raises(ValueError, match='`daysback` must be a positive'): | |
write_raw_bids(raw, bids_basename, output_path, | |
events_data=events_fname, | |
event_id=event_id, | |
anonymize=dict(daysback=-10), | |
overwrite=True) | |
output_path = _TempDir() | |
raw = mne.io.read_raw_fif(raw_fname) | |
with pytest.raises(ValueError, match='`daysback` is too large'): | |
write_raw_bids(raw, bids_basename, output_path, | |
events_data=events_fname, | |
event_id=event_id, | |
anonymize=dict(daysback=10000), | |
overwrite=False) | |
output_path = _TempDir() | |
raw = mne.io.read_raw_fif(raw_fname) | |
write_raw_bids(raw, bids_basename, output_path, | |
events_data=events_fname, | |
event_id=event_id, | |
anonymize=dict(daysback=600, keep_his=True), | |
overwrite=False) | |
scans_tsv = make_bids_basename( | |
subject=subject_id, session=session_id, suffix='scans.tsv', | |
prefix=op.join(output_path, 'sub-01', 'ses-01')) | |
data = _from_tsv(scans_tsv) | |
assert datetime.strptime(data['acq_time'][0], | |
'%Y-%m-%dT%H:%M:%S').year < 1925 | |
_bids_validate(output_path) | |
# check that split files have part key | |
raw = mne.io.read_raw_fif(raw_fname) | |
data_path3 = _TempDir() | |
raw_fname3 = op.join(data_path3, 'sample_audvis_raw.fif') | |
raw.save(raw_fname3, buffer_size_sec=1.0, split_size='10MB', | |
split_naming='neuromag', overwrite=True) | |
raw = mne.io.read_raw_fif(raw_fname3) | |
subject_id3 = '03' | |
bids_basename3 = bids_basename.replace(subject_id, subject_id3) | |
bids_output_path = write_raw_bids(raw, bids_basename3, output_path, | |
overwrite=False) | |
files = glob(op.join(bids_output_path, 'sub-' + subject_id3, | |
'ses-' + subject_id3, 'meg', '*.fif')) | |
for FILE in files: | |
assert 'part' in FILE | |
def test_kit(_bids_validate): | |
"""Test functionality of the write_raw_bids conversion for KIT data.""" | |
output_path = _TempDir() | |
data_path = op.join(base_path, 'kit', 'tests', 'data') | |
raw_fname = op.join(data_path, 'test.sqd') | |
events_fname = op.join(data_path, 'test-eve.txt') | |
hpi_fname = op.join(data_path, 'test_mrk.sqd') | |
hpi_pre_fname = op.join(data_path, 'test_mrk_pre.sqd') | |
hpi_post_fname = op.join(data_path, 'test_mrk_post.sqd') | |
electrode_fname = op.join(data_path, 'test_elp.txt') | |
headshape_fname = op.join(data_path, 'test_hsp.txt') | |
event_id = dict(cond=1) | |
kit_bids_basename = bids_basename.replace('_acq-01', '') | |
raw = mne.io.read_raw_kit( | |
raw_fname, mrk=hpi_fname, elp=electrode_fname, | |
hsp=headshape_fname) | |
write_raw_bids(raw, kit_bids_basename, output_path, | |
events_data=events_fname, | |
event_id=event_id, overwrite=False) | |
_bids_validate(output_path) | |
assert op.exists(op.join(output_path, 'participants.tsv')) | |
read_raw_bids(kit_bids_basename + '_meg.sqd', output_path) | |
# test anonymize | |
output_path = _test_anonymize(raw, kit_bids_basename, | |
events_fname, event_id) | |
_bids_validate(output_path) | |
# ensure the channels file has no STI 014 channel: | |
channels_tsv = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, run=run, | |
suffix='channels.tsv', | |
prefix=op.join(output_path, 'sub-01', 'ses-01', 'meg')) | |
data = _from_tsv(channels_tsv) | |
assert 'STI 014' not in data['name'] | |
# ensure the marker file is produced in the right place | |
marker_fname = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, run=run, | |
suffix='markers.sqd', | |
prefix=op.join(output_path, 'sub-01', 'ses-01', 'meg')) | |
assert op.exists(marker_fname) | |
# test attempts at writing invalid event data | |
event_data = np.loadtxt(events_fname) | |
# make the data the wrong number of dimensions | |
event_data_3d = np.atleast_3d(event_data) | |
other_output_path = _TempDir() | |
with pytest.raises(ValueError, match='two dimensions'): | |
write_raw_bids(raw, bids_basename, other_output_path, | |
events_data=event_data_3d, event_id=event_id, | |
overwrite=True) | |
# remove 3rd column | |
event_data = event_data[:, :2] | |
with pytest.raises(ValueError, match='second dimension'): | |
write_raw_bids(raw, bids_basename, other_output_path, | |
events_data=event_data, event_id=event_id, | |
overwrite=True) | |
# test correct naming of marker files | |
raw = mne.io.read_raw_kit( | |
raw_fname, mrk=[hpi_pre_fname, hpi_post_fname], elp=electrode_fname, | |
hsp=headshape_fname) | |
write_raw_bids(raw, | |
kit_bids_basename.replace('sub-01', 'sub-%s' % subject_id2), | |
output_path, events_data=events_fname, event_id=event_id, | |
overwrite=False) | |
_bids_validate(output_path) | |
# ensure the marker files are renamed correctly | |
marker_fname = make_bids_basename( | |
subject=subject_id2, session=session_id, task=task, run=run, | |
suffix='markers.sqd', acquisition='pre', | |
prefix=os.path.join(output_path, 'sub-02', 'ses-01', 'meg')) | |
info = get_kit_info(marker_fname, False)[0] | |
assert info['meas_date'] == get_kit_info(hpi_pre_fname, | |
False)[0]['meas_date'] | |
marker_fname = marker_fname.replace('acq-pre', 'acq-post') | |
info = get_kit_info(marker_fname, False)[0] | |
assert info['meas_date'] == get_kit_info(hpi_post_fname, | |
False)[0]['meas_date'] | |
# check that providing markers in the wrong order raises an error | |
raw = mne.io.read_raw_kit( | |
raw_fname, mrk=[hpi_post_fname, hpi_pre_fname], elp=electrode_fname, | |
hsp=headshape_fname) | |
with pytest.raises(ValueError, match='Markers'): | |
write_raw_bids( | |
raw, | |
kit_bids_basename.replace('sub-01', 'sub-%s' % subject_id2), | |
output_path, events_data=events_fname, event_id=event_id, | |
overwrite=True) | |
def test_ctf(_bids_validate): | |
"""Test functionality of the write_raw_bids conversion for CTF data.""" | |
output_path = _TempDir() | |
data_path = op.join(testing.data_path(download=False), 'CTF') | |
raw_fname = op.join(data_path, 'testdata_ctf.ds') | |
raw = mne.io.read_raw_ctf(raw_fname) | |
with pytest.warns(UserWarning, match='No line frequency'): | |
write_raw_bids(raw, bids_basename, output_path=output_path) | |
_bids_validate(output_path) | |
with pytest.warns(UserWarning, match='Did not find any events'): | |
raw = read_raw_bids(bids_basename + '_meg.ds', output_path) | |
# test to check that running again with overwrite == False raises an error | |
with pytest.raises(FileExistsError, match="already exists"): # noqa: F821 | |
write_raw_bids(raw, bids_basename, output_path=output_path) | |
assert op.exists(op.join(output_path, 'participants.tsv')) | |
# test anonymize | |
raw = mne.io.read_raw_ctf(raw_fname) | |
with pytest.warns(UserWarning, | |
match='Converting to FIF for anonymization'): | |
output_path = _test_anonymize(raw, bids_basename) | |
_bids_validate(output_path) | |
def test_bti(_bids_validate): | |
"""Test functionality of the write_raw_bids conversion for BTi data.""" | |
output_path = _TempDir() | |
data_path = op.join(base_path, 'bti', 'tests', 'data') | |
raw_fname = op.join(data_path, 'test_pdf_linux') | |
config_fname = op.join(data_path, 'test_config_linux') | |
headshape_fname = op.join(data_path, 'test_hs_linux') | |
raw = mne.io.read_raw_bti(raw_fname, config_fname=config_fname, | |
head_shape_fname=headshape_fname) | |
write_raw_bids(raw, bids_basename, output_path, verbose=True) | |
assert op.exists(op.join(output_path, 'participants.tsv')) | |
_bids_validate(output_path) | |
raw = read_raw_bids(bids_basename + '_meg', output_path) | |
# test anonymize | |
raw = mne.io.read_raw_bti(raw_fname, config_fname=config_fname, | |
head_shape_fname=headshape_fname) | |
with pytest.warns(UserWarning, | |
match='Converting to FIF for anonymization'): | |
output_path = _test_anonymize(raw, bids_basename) | |
_bids_validate(output_path) | |
# XXX: vhdr test currently passes only on MNE master. Skip until next release. | |
# see: https://github.com/mne-tools/mne-python/pull/6558 | |
@pytest.mark.skipif(LooseVersion(mne.__version__) < LooseVersion('0.19'), | |
reason="requires mne 0.19.dev0 or higher") | |
def test_vhdr(_bids_validate): | |
"""Test write_raw_bids conversion for BrainVision data.""" | |
output_path = _TempDir() | |
data_path = op.join(base_path, 'brainvision', 'tests', 'data') | |
raw_fname = op.join(data_path, 'test.vhdr') | |
raw = mne.io.read_raw_brainvision(raw_fname) | |
# inject a bad channel | |
assert not raw.info['bads'] | |
injected_bad = ['FP1'] | |
raw.info['bads'] = injected_bad | |
# write with injected bad channels | |
write_raw_bids(raw, bids_basename_minimal, output_path, overwrite=False) | |
_bids_validate(output_path) | |
# read and also get the bad channels | |
raw = read_raw_bids(bids_basename_minimal + '_eeg.vhdr', output_path) | |
# Check that injected bad channel shows up in raw after reading | |
np.testing.assert_array_equal(np.asarray(raw.info['bads']), | |
np.asarray(injected_bad)) | |
# Test that correct channel units are written ... and that bad channel | |
# is in channels.tsv | |
channels_tsv_name = op.join(output_path, 'sub-{}'.format(subject_id), | |
'eeg', bids_basename_minimal + '_channels.tsv') | |
data = _from_tsv(channels_tsv_name) | |
assert data['units'][data['name'].index('FP1')] == 'µV' | |
assert data['units'][data['name'].index('CP5')] == 'n/a' | |
assert data['status'][data['name'].index(injected_bad[0])] == 'bad' | |
# check events.tsv is written | |
events_tsv_fname = channels_tsv_name.replace('channels', 'events') | |
assert op.exists(events_tsv_fname) | |
# create another bids folder with the overwrite command and check | |
# no files are in the folder | |
data_path = make_bids_folders(subject=subject_id, kind='eeg', | |
output_path=output_path, overwrite=True) | |
assert len([f for f in os.listdir(data_path) if op.isfile(f)]) == 0 | |
# test anonymize and convert | |
raw = mne.io.read_raw_brainvision(raw_fname) | |
_test_anonymize(raw, bids_basename) | |
_test_convert(raw, bids_basename) | |
# Also cover iEEG | |
# We use the same data and pretend that eeg channels are ecog | |
raw = mne.io.read_raw_brainvision(raw_fname) | |
raw.set_channel_types({raw.ch_names[i]: 'ecog' | |
for i in mne.pick_types(raw.info, eeg=True)}) | |
output_path = _TempDir() | |
write_raw_bids(raw, bids_basename, output_path, overwrite=False) | |
_bids_validate(output_path) | |
def test_edf(_bids_validate): | |
"""Test write_raw_bids conversion for European Data Format data.""" | |
output_path = _TempDir() | |
data_path = op.join(testing.data_path(), 'EDF') | |
raw_fname = op.join(data_path, 'test_reduced.edf') | |
raw = mne.io.read_raw_edf(raw_fname, preload=True) | |
# XXX: hack that should be fixed later. Annotation reading is | |
# broken for this file with preload=False and read_annotations_edf | |
raw.preload = False | |
raw.rename_channels({raw.info['ch_names'][0]: 'EOG'}) | |
raw.info['chs'][0]['coil_type'] = FIFF.FIFFV_COIL_EEG_BIPOLAR | |
raw.rename_channels({raw.info['ch_names'][1]: 'EMG'}) | |
raw.set_channel_types({'EMG': 'emg'}) | |
write_raw_bids(raw, bids_basename, output_path) | |
# Reading the file back should raise an error, because we renamed channels | |
# in `raw` and used that information to write a channels.tsv. Yet, we | |
# saved the unchanged `raw` in the BIDS folder, so channels in the TSV and | |
# in raw clash | |
with pytest.raises(RuntimeError, match='Channels do not correspond'): | |
read_raw_bids(bids_basename + '_eeg.edf', output_path) | |
bids_fname = bids_basename.replace('run-01', 'run-%s' % run2) | |
write_raw_bids(raw, bids_fname, output_path, overwrite=True) | |
_bids_validate(output_path) | |
# ensure there is an EMG channel in the channels.tsv: | |
channels_tsv = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, run=run, | |
suffix='channels.tsv', acquisition=acq, | |
prefix=op.join(output_path, 'sub-01', 'ses-01', 'eeg')) | |
data = _from_tsv(channels_tsv) | |
assert 'ElectroMyoGram' in data['description'] | |
# check that the scans list contains two scans | |
scans_tsv = make_bids_basename( | |
subject=subject_id, session=session_id, suffix='scans.tsv', | |
prefix=op.join(output_path, 'sub-01', 'ses-01')) | |
data = _from_tsv(scans_tsv) | |
assert len(list(data.values())[0]) == 2 | |
# Also cover iEEG | |
# We use the same data and pretend that eeg channels are ecog | |
raw.set_channel_types({raw.ch_names[i]: 'ecog' | |
for i in mne.pick_types(raw.info, eeg=True)}) | |
output_path = _TempDir() | |
write_raw_bids(raw, bids_basename, output_path) | |
_bids_validate(output_path) | |
# test anonymize and convert | |
raw = mne.io.read_raw_edf(raw_fname, preload=True) | |
raw.preload = False | |
output_path = _test_anonymize(raw, bids_basename) | |
_bids_validate(output_path) | |
output_path = _test_convert(raw, bids_basename) | |
_bids_validate(output_path) | |
def test_bdf(_bids_validate): | |
"""Test write_raw_bids conversion for Biosemi data.""" | |
output_path = _TempDir() | |
data_path = op.join(base_path, 'edf', 'tests', 'data') | |
raw_fname = op.join(data_path, 'test.bdf') | |
raw = mne.io.read_raw_bdf(raw_fname) | |
with pytest.warns(UserWarning, match='No line frequency found'): | |
write_raw_bids(raw, bids_basename, output_path, overwrite=False) | |
_bids_validate(output_path) | |
# Test also the reading of channel types from channels.tsv | |
# the first channel in the raw data is not MISC right now | |
test_ch_idx = 0 | |
assert coil_type(raw.info, test_ch_idx) != 'misc' | |
# we will change the channel type to MISC and overwrite the channels file | |
bids_fname = bids_basename + '_eeg.bdf' | |
channels_fname = _find_matching_sidecar(bids_fname, output_path, | |
'channels.tsv') | |
channels_dict = _from_tsv(channels_fname) | |
channels_dict['type'][test_ch_idx] = 'MISC' | |
_to_tsv(channels_dict, channels_fname) | |
# Now read the raw data back from BIDS, with the tampered TSV, to show | |
# that the channels.tsv truly influences how read_raw_bids sets ch_types | |
# in the raw data object | |
raw = read_raw_bids(bids_fname, output_path) | |
assert coil_type(raw.info, test_ch_idx) == 'misc' | |
# Test cropped assertion error | |
raw = mne.io.read_raw_bdf(raw_fname) | |
raw.crop(0, raw.times[-2]) | |
with pytest.raises(AssertionError, match='cropped'): | |
write_raw_bids(raw, bids_basename, output_path) | |
# test anonymize and convert | |
raw = mne.io.read_raw_bdf(raw_fname) | |
output_path = _test_anonymize(raw, bids_basename) | |
_bids_validate(output_path) | |
output_path = _test_convert(raw, bids_basename) | |
_bids_validate(output_path) | |
def test_set(_bids_validate): | |
"""Test write_raw_bids conversion for EEGLAB data.""" | |
# standalone .set file | |
output_path = _TempDir() | |
data_path = op.join(testing.data_path(), 'EEGLAB') | |
# .set with associated .fdt | |
output_path = _TempDir() | |
data_path = op.join(testing.data_path(), 'EEGLAB') | |
raw_fname = op.join(data_path, 'test_raw.set') | |
raw = mne.io.read_raw_eeglab(raw_fname) | |
# embedded - test mne-version assertion | |
tmp_version = mne.__version__ | |
mne.__version__ = '0.16' | |
with pytest.raises(ValueError, match='Your version of MNE is too old.'): | |
write_raw_bids(raw, bids_basename, output_path) | |
mne.__version__ = tmp_version | |
# proceed with the actual test for EEGLAB data | |
write_raw_bids(raw, bids_basename, output_path, overwrite=False) | |
read_raw_bids(bids_basename + '_eeg.set', output_path) | |
with pytest.raises(FileExistsError, match="already exists"): # noqa: F821 | |
write_raw_bids(raw, bids_basename, output_path=output_path, | |
overwrite=False) | |
_bids_validate(output_path) | |
# check events.tsv is written | |
# XXX: only from 0.18 onwards because events_from_annotations | |
# is broken for earlier versions | |
events_tsv_fname = op.join(output_path, 'sub-' + subject_id, | |
'ses-' + session_id, 'eeg', | |
bids_basename + '_events.tsv') | |
if check_version('mne', '0.18'): | |
assert op.exists(events_tsv_fname) | |
# Also cover iEEG | |
# We use the same data and pretend that eeg channels are ecog | |
raw.set_channel_types({raw.ch_names[i]: 'ecog' | |
for i in mne.pick_types(raw.info, eeg=True)}) | |
output_path = _TempDir() | |
write_raw_bids(raw, bids_basename, output_path) | |
_bids_validate(output_path) | |
# test anonymize and convert | |
output_path = _test_anonymize(raw, bids_basename) | |
_bids_validate(output_path) | |
output_path = _test_convert(raw, bids_basename) | |
_bids_validate(output_path) | |
@requires_nibabel() | |
def test_write_anat(_bids_validate): | |
"""Test writing anatomical data.""" | |
# Get the MNE testing sample data | |
import nibabel as nib | |
output_path = _TempDir() | |
data_path = testing.data_path() | |
raw_fname = op.join(data_path, 'MEG', 'sample', | |
'sample_audvis_trunc_raw.fif') | |
event_id = {'Auditory/Left': 1, 'Auditory/Right': 2, 'Visual/Left': 3, | |
'Visual/Right': 4, 'Smiley': 5, 'Button': 32} | |
events_fname = op.join(data_path, 'MEG', 'sample', | |
'sample_audvis_trunc_raw-eve.fif') | |
raw = mne.io.read_raw_fif(raw_fname) | |
write_raw_bids(raw, bids_basename, output_path, events_data=events_fname, | |
event_id=event_id, overwrite=False) | |
# Write some MRI data and supply a `trans` | |
trans_fname = raw_fname.replace('_raw.fif', '-trans.fif') | |
trans = mne.read_trans(trans_fname) | |
# Get the T1 weighted MRI data file | |
# Needs to be converted to Nifti because we only have mgh in our test base | |
t1w_mgh = op.join(data_path, 'subjects', 'sample', 'mri', 'T1.mgz') | |
anat_dir = write_anat(output_path, subject_id, t1w_mgh, session_id, acq, | |
raw=raw, trans=trans, deface=True, verbose=True, | |
overwrite=True) | |
_bids_validate(output_path) | |
# Validate that files are as expected | |
t1w_json_path = op.join(anat_dir, 'sub-01_ses-01_acq-01_T1w.json') | |
assert op.exists(t1w_json_path) | |
assert op.exists(op.join(anat_dir, 'sub-01_ses-01_acq-01_T1w.nii.gz')) | |
with open(t1w_json_path, 'r') as f: | |
t1w_json = json.load(f) | |
print(t1w_json) | |
# We only should have AnatomicalLandmarkCoordinates as key | |
np.testing.assert_array_equal(list(t1w_json.keys()), | |
['AnatomicalLandmarkCoordinates']) | |
# And within AnatomicalLandmarkCoordinates only LPA, NAS, RPA in that order | |
anat_dict = t1w_json['AnatomicalLandmarkCoordinates'] | |
point_list = ['LPA', 'NAS', 'RPA'] | |
np.testing.assert_array_equal(list(anat_dict.keys()), | |
point_list) | |
# test the actual values of the voxels (no floating points) | |
for i, point in enumerate([(66, 51, 46), (41, 32, 74), (17, 53, 47)]): | |
coords = anat_dict[point_list[i]] | |
np.testing.assert_array_equal(np.asarray(coords, dtype=int), | |
point) | |
# BONUS: test also that we can find the matching sidecar | |
side_fname = _find_matching_sidecar('sub-01_ses-01_acq-01_T1w.nii.gz', | |
output_path, 'T1w.json') | |
assert op.split(side_fname)[-1] == 'sub-01_ses-01_acq-01_T1w.json' | |
# Now try some anat writing that will fail | |
# We already have some MRI data there | |
with pytest.raises(IOError, match='`overwrite` is set to False'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, acq, | |
raw=raw, trans=trans, verbose=True, deface=False, | |
overwrite=False) | |
# pass some invalid type as T1 MRI | |
with pytest.raises(ValueError, match='must be a path to a T1 weighted'): | |
write_anat(output_path, subject_id, 9999999999999, session_id, raw=raw, | |
trans=trans, verbose=True, deface=False, overwrite=True) | |
# Return without writing sidecar | |
sh.rmtree(anat_dir) | |
write_anat(output_path, subject_id, t1w_mgh, session_id) | |
# Assert that we truly cannot find a sidecar | |
with pytest.raises(RuntimeError, match='Did not find any'): | |
_find_matching_sidecar('sub-01_ses-01_acq-01_T1w.nii.gz', | |
output_path, 'T1w.json') | |
# trans has a wrong type | |
wrong_type = 1 | |
match = 'transform type {} not known, must be'.format(type(wrong_type)) | |
with pytest.raises(ValueError, match=match): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=wrong_type, verbose=True, deface=False, | |
overwrite=True) | |
# trans is a str, but file does not exist | |
wrong_fname = 'not_a_trans' | |
match = 'trans file "{}" not found'.format(wrong_fname) | |
with pytest.raises(IOError, match=match): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=wrong_fname, verbose=True, overwrite=True) | |
# However, reading trans if it is a string pointing to trans is fine | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=trans_fname, verbose=True, deface=False, | |
overwrite=True) | |
# Writing without a session does NOT yield "ses-None" anywhere | |
anat_dir2 = write_anat(output_path, subject_id, t1w_mgh, None) | |
assert 'ses-None' not in anat_dir2 | |
assert op.exists(op.join(anat_dir2, 'sub-01_T1w.nii.gz')) | |
# specify trans but not raw | |
with pytest.raises(ValueError, match='must be specified if `trans`'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=None, | |
trans=trans, verbose=True, deface=False, overwrite=True) | |
# test deface | |
anat_dir = write_anat(output_path, subject_id, t1w_mgh, | |
session_id, raw=raw, trans=trans_fname, | |
verbose=True, deface=True, overwrite=True) | |
t1w = nib.load(op.join(anat_dir, 'sub-01_ses-01_T1w.nii.gz')) | |
vox_sum = t1w.get_data().sum() | |
anat_dir2 = write_anat(output_path, subject_id, t1w_mgh, | |
session_id, raw=raw, trans=trans_fname, | |
verbose=True, deface=dict(inset=25.), | |
overwrite=True) | |
t1w2 = nib.load(op.join(anat_dir2, 'sub-01_ses-01_T1w.nii.gz')) | |
vox_sum2 = t1w2.get_data().sum() | |
assert vox_sum > vox_sum2 | |
anat_dir3 = write_anat(output_path, subject_id, t1w_mgh, | |
session_id, raw=raw, trans=trans_fname, | |
verbose=True, deface=dict(theta=25), | |
overwrite=True) | |
t1w3 = nib.load(op.join(anat_dir3, 'sub-01_ses-01_T1w.nii.gz')) | |
vox_sum3 = t1w3.get_data().sum() | |
assert vox_sum > vox_sum3 | |
with pytest.raises(ValueError, | |
match='The raw object, trans and raw must be provided'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=None, verbose=True, deface=True, | |
overwrite=True) | |
with pytest.raises(ValueError, match='inset must be numeric'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=trans, verbose=True, deface=dict(inset='small'), | |
overwrite=True) | |
with pytest.raises(ValueError, match='inset should be positive'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=trans, verbose=True, deface=dict(inset=-2.), | |
overwrite=True) | |
with pytest.raises(ValueError, match='theta must be numeric'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=trans, verbose=True, deface=dict(theta='big'), | |
overwrite=True) | |
with pytest.raises(ValueError, | |
match='theta should be between 0 and 90 degrees'): | |
write_anat(output_path, subject_id, t1w_mgh, session_id, raw=raw, | |
trans=trans, verbose=True, deface=dict(theta=100), | |
overwrite=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Make BIDS compatible directory structures and infer meta data from MNE.""" | |
# Authors: Mainak Jas <[email protected]> | |
# Alexandre Gramfort <[email protected]> | |
# Teon Brooks <[email protected]> | |
# Chris Holdgraf <[email protected]> | |
# Stefan Appelhoff <[email protected]> | |
# Matt Sanderson <[email protected]> | |
# | |
# License: BSD (3-clause) | |
import os | |
import os.path as op | |
from datetime import datetime, date, timedelta | |
from warnings import warn | |
import shutil as sh | |
from collections import defaultdict, OrderedDict | |
import numpy as np | |
from numpy.testing import assert_array_equal | |
from mne.transforms import (_get_trans, apply_trans, get_ras_to_neuromag_trans, | |
rotation, translation) | |
from mne import Epochs, events_from_annotations | |
from mne.io.constants import FIFF | |
from mne.io.pick import channel_type | |
from mne.io import BaseRaw, anonymize_info, _stamp_to_dt | |
from mne.channels.channels import _unit2human | |
from mne.utils import check_version, has_nibabel | |
from mne_bids.pick import coil_type | |
from mne_bids.utils import (_write_json, _write_tsv, _read_events, _mkdir_p, | |
_age_on_date, _infer_eeg_placement_scheme, | |
_check_key_val, | |
_parse_bids_filename, _handle_kind, _check_types, | |
_get_mrk_meas_date, _extract_landmarks, _parse_ext, | |
_get_ch_type_mapping) | |
from mne_bids.copyfiles import (copyfile_brainvision, copyfile_eeglab, | |
copyfile_ctf, copyfile_bti) | |
from mne_bids.read import reader | |
from mne_bids.tsv_handler import _from_tsv, _combine, _drop, _contains_row | |
from mne_bids.config import (ORIENTATION, UNITS, MANUFACTURERS, | |
IGNORED_CHANNELS, ALLOWED_EXTENSIONS, | |
BIDS_VERSION) | |
def _is_numeric(n): | |
return isinstance(n, (np.integer, np.floating, int, float)) | |
def _channels_tsv(raw, fname, overwrite=False, verbose=True): | |
"""Create a channels.tsv file and save it. | |
Parameters | |
---------- | |
raw : instance of Raw | |
The data as MNE-Python Raw object. | |
fname : str | |
Filename to save the channels.tsv to. | |
overwrite : bool | |
Whether to overwrite the existing file. | |
Defaults to False. | |
verbose : bool | |
Set verbose output to true or false. | |
""" | |
# Get channel type mappings between BIDS and MNE nomenclatures | |
map_chs = _get_ch_type_mapping(fro='mne', to='bids') | |
# Prepare the descriptions for each channel type | |
map_desc = defaultdict(lambda: 'Other type of channel') | |
map_desc.update(meggradaxial='Axial Gradiometer', | |
megrefgradaxial='Axial Gradiometer Reference', | |
meggradplanar='Planar Gradiometer', | |
megmag='Magnetometer', | |
megrefmag='Magnetometer Reference', | |
stim='Trigger', | |
eeg='ElectroEncephaloGram', | |
ecog='Electrocorticography', | |
seeg='StereoEEG', | |
ecg='ElectroCardioGram', | |
eog='ElectroOculoGram', | |
emg='ElectroMyoGram', | |
misc='Miscellaneous') | |
get_specific = ('mag', 'ref_meg', 'grad') | |
# get the manufacturer from the file in the Raw object | |
manufacturer = None | |
_, ext = _parse_ext(raw.filenames[0], verbose=verbose) | |
manufacturer = MANUFACTURERS[ext] | |
ignored_channels = IGNORED_CHANNELS.get(manufacturer, list()) | |
status, ch_type, description = list(), list(), list() | |
for idx, ch in enumerate(raw.info['ch_names']): | |
status.append('bad' if ch in raw.info['bads'] else 'good') | |
_channel_type = channel_type(raw.info, idx) | |
if _channel_type in get_specific: | |
_channel_type = coil_type(raw.info, idx, _channel_type) | |
ch_type.append(map_chs[_channel_type]) | |
description.append(map_desc[_channel_type]) | |
low_cutoff, high_cutoff = (raw.info['highpass'], raw.info['lowpass']) | |
if raw._orig_units: | |
units = [raw._orig_units.get(ch, 'n/a') for ch in raw.ch_names] | |
else: | |
units = [_unit2human.get(ch_i['unit'], 'n/a') | |
for ch_i in raw.info['chs']] | |
units = [u if u not in ['NA'] else 'n/a' for u in units] | |
n_channels = raw.info['nchan'] | |
sfreq = raw.info['sfreq'] | |
ch_data = OrderedDict([ | |
('name', raw.info['ch_names']), | |
('type', ch_type), | |
('units', units), | |
('low_cutoff', np.full((n_channels), low_cutoff)), | |
('high_cutoff', np.full((n_channels), high_cutoff)), | |
('description', description), | |
('sampling_frequency', np.full((n_channels), sfreq)), | |
('status', status)]) | |
ch_data = _drop(ch_data, ignored_channels, 'name') | |
_write_tsv(fname, ch_data, overwrite, verbose) | |
return fname | |
def _events_tsv(events, raw, fname, trial_type, overwrite=False, | |
verbose=True): | |
"""Create an events.tsv file and save it. | |
This function will write the mandatory 'onset', and 'duration' columns as | |
well as the optional 'value' and 'sample'. The 'value' | |
corresponds to the marker value as found in the TRIG channel of the | |
recording. In addition, the 'trial_type' field can be written. | |
Parameters | |
---------- | |
events : array, shape = (n_events, 3) | |
The first column contains the event time in samples and the third | |
column contains the event id. The second column is ignored for now but | |
typically contains the value of the trigger channel either immediately | |
before the event or immediately after. | |
raw : instance of Raw | |
The data as MNE-Python Raw object. | |
fname : str | |
Filename to save the events.tsv to. | |
trial_type : dict | None | |
Dictionary mapping a brief description key to an event id (value). For | |
example {'Go': 1, 'No Go': 2}. | |
overwrite : bool | |
Whether to overwrite the existing file. | |
Defaults to False. | |
verbose : bool | |
Set verbose output to true or false. | |
Notes | |
----- | |
The function writes durations of zero for each event. | |
""" | |
# Start by filling all data that we know into an ordered dictionary | |
first_samp = raw.first_samp | |
sfreq = raw.info['sfreq'] | |
events[:, 0] -= first_samp | |
# Onset column needs to be specified in seconds | |
data = OrderedDict([('onset', events[:, 0] / sfreq), | |
('duration', np.zeros(events.shape[0])), | |
('trial_type', None), | |
('value', events[:, 2]), | |
('sample', events[:, 0])]) | |
# Now check if trial_type is specified or should be removed | |
if trial_type: | |
trial_type_map = {v: k for k, v in trial_type.items()} | |
data['trial_type'] = [trial_type_map.get(i, 'n/a') for | |
i in events[:, 2]] | |
else: | |
del data['trial_type'] | |
_write_tsv(fname, data, overwrite, verbose) | |
return fname | |
def _participants_tsv(raw, subject_id, fname, overwrite=False, | |
verbose=True): | |
"""Create a participants.tsv file and save it. | |
This will append any new participant data to the current list if it | |
exists. Otherwise a new file will be created with the provided information. | |
Parameters | |
---------- | |
raw : instance of Raw | |
The data as MNE-Python Raw object. | |
subject_id : str | |
The subject name in BIDS compatible format ('01', '02', etc.) | |
fname : str | |
Filename to save the participants.tsv to. | |
overwrite : bool | |
Whether to overwrite the existing file. | |
Defaults to False. | |
If there is already data for the given `subject_id` and overwrite is | |
False, an error will be raised. | |
verbose : bool | |
Set verbose output to true or false. | |
""" | |
subject_id = 'sub-' + subject_id | |
data = OrderedDict(participant_id=[subject_id]) | |
subject_age = "n/a" | |
sex = "n/a" | |
subject_info = raw.info['subject_info'] | |
if subject_info is not None: | |
sexes = {0: 'n/a', 1: 'M', 2: 'F'} | |
sex = sexes[subject_info.get('sex', 0)] | |
# determine the age of the participant | |
age = subject_info.get('birthday', None) | |
meas_date = raw.info.get('meas_date', None) | |
if isinstance(meas_date, (tuple, list, np.ndarray)): | |
meas_date = meas_date[0] | |
if meas_date is not None and age is not None: | |
bday = datetime(age[0], age[1], age[2]) | |
meas_datetime = datetime.fromtimestamp(meas_date) | |
subject_age = _age_on_date(bday, meas_datetime) | |
else: | |
subject_age = "n/a" | |
data.update({'age': [subject_age], 'sex': [sex]}) | |
if os.path.exists(fname): | |
orig_data = _from_tsv(fname) | |
# whether the new data exists identically in the previous data | |
exact_included = _contains_row(orig_data, | |
{'participant_id': subject_id, | |
'age': subject_age, | |
'sex': sex}) | |
# whether the subject id is in the previous data | |
sid_included = subject_id in orig_data['participant_id'] | |
# if the subject data provided is different to the currently existing | |
# data and overwrite is not True raise an error | |
if (sid_included and not exact_included) and not overwrite: | |
raise FileExistsError('"%s" already exists in the participant ' # noqa: E501 F821 | |
'list. Please set overwrite to ' | |
'True.' % subject_id) | |
# otherwise add the new data | |
data = _combine(orig_data, data, 'participant_id') | |
# overwrite is forced to True as all issues with overwrite == False have | |
# been handled by this point | |
_write_tsv(fname, data, True, verbose) | |
return fname | |
def _participants_json(fname, overwrite=False, verbose=True): | |
"""Create participants.json for non-default columns in accompanying TSV. | |
Parameters | |
---------- | |
fname : str | |
Filename to save the scans.tsv to. | |
overwrite : bool | |
Defaults to False. | |
Whether to overwrite the existing data in the file. | |
If there is already data for the given `fname` and overwrite is False, | |
an error will be raised. | |
verbose : bool | |
Set verbose output to true or false. | |
""" | |
cols = OrderedDict() | |
cols['participant_id'] = {'Description': 'Unique participant identifier'} | |
cols['age'] = {'Description': 'Age of the participant at time of testing', | |
'Units': 'years'} | |
cols['sex'] = {'Description': 'Biological sex of the participant', | |
'Levels': {'F': 'female', 'M': 'male'}} | |
_write_json(fname, cols, overwrite, verbose) | |
return fname | |
def _scans_tsv(raw, raw_fname, fname, overwrite=False, verbose=True): | |
"""Create a scans.tsv file and save it. | |
Parameters | |
---------- | |
raw : instance of Raw | |
The data as MNE-Python Raw object. | |
raw_fname : str | |
Relative path to the raw data file. | |
fname : str | |
Filename to save the scans.tsv to. | |
overwrite : bool | |
Defaults to False. | |
Whether to overwrite the existing data in the file. | |
If there is already data for the given `fname` and overwrite is False, | |
an error will be raised. | |
verbose : bool | |
Set verbose output to true or false. | |
""" | |
# get measurement date from the data info | |
meas_date = raw.info['meas_date'] | |
if isinstance(meas_date, (tuple, list, np.ndarray)): | |
meas_date = meas_date[0] | |
# windows datetime bug for timestamp < 0 | |
# OSError [Errno 22] Invalid Argument | |
acq_time = \ | |
(datetime.fromtimestamp(0) + | |
timedelta(seconds=int(meas_date))).strftime( | |
'%Y-%m-%dT%H:%M:%S') | |
else: | |
acq_time = 'n/a' | |
data = OrderedDict([('filename', ['%s' % raw_fname.replace(os.sep, '/')]), | |
('acq_time', [acq_time])]) | |
if os.path.exists(fname): | |
orig_data = _from_tsv(fname) | |
# if the file name is already in the file raise an error | |
if raw_fname in orig_data['filename'] and not overwrite: | |
raise FileExistsError('"%s" already exists in the scans list. ' # noqa: E501 F821 | |
'Please set overwrite to True.' % raw_fname) | |
# otherwise add the new data | |
data = _combine(orig_data, data, 'filename') | |
# overwrite is forced to True as all issues with overwrite == False have | |
# been handled by this point | |
_write_tsv(fname, data, True, verbose) | |
return fname | |
def _coordsystem_json(raw, unit, orient, manufacturer, fname, | |
overwrite=False, verbose=True): | |
"""Create a coordsystem.json file and save it. | |
Parameters | |
---------- | |
raw : instance of Raw | |
The data as MNE-Python Raw object. | |
unit : str | |
Units to be used in the coordsystem specification. | |
orient : str | |
Used to define the coordinate system for the head coils. | |
manufacturer : str | |
Used to define the coordinate system for the MEG sensors. | |
fname : str | |
Filename to save the coordsystem.json to. | |
overwrite : bool | |
Whether to overwrite the existing file. | |
Defaults to False. | |
verbose : bool | |
Set verbose output to true or false. | |
""" | |
dig = raw.info['dig'] | |
coords = _extract_landmarks(dig) | |
hpi = {d['ident']: d for d in dig if d['kind'] == FIFF.FIFFV_POINT_HPI} | |
if hpi: | |
for ident in hpi.keys(): | |
coords['coil%d' % ident] = hpi[ident]['r'].tolist() | |
coord_frame = set([dig[ii]['coord_frame'] for ii in range(len(dig))]) | |
if len(coord_frame) > 1: | |
err = 'All HPI and Fiducials must be in the same coordinate frame.' | |
raise ValueError(err) | |
fid_json = {'MEGCoordinateSystem': manufacturer, | |
'MEGCoordinateUnits': unit, # XXX validate this | |
'HeadCoilCoordinates': coords, | |
'HeadCoilCoordinateSystem': orient, | |
'HeadCoilCoordinateUnits': unit # XXX validate this | |
} | |
_write_json(fname, fid_json, overwrite, verbose) | |
return fname | |
def _sidecar_json(raw, task, manufacturer, fname, kind, overwrite=False, | |
verbose=True): | |
"""Create a sidecar json file depending on the kind and save it. | |
The sidecar json file provides meta data about the data of a certain kind. | |
Parameters | |
---------- | |
raw : instance of Raw | |
The data as MNE-Python Raw object. | |
task : str | |
Name of the task the data is based on. | |
manufacturer : str | |
Manufacturer of the acquisition system. For MEG also used to define the | |
coordinate system for the MEG sensors. | |
fname : str | |
Filename to save the sidecar json to. | |
kind : str | |
Type of the data as in ALLOWED_KINDS. | |
overwrite : bool | |
Whether to overwrite the existing file. | |
Defaults to False. | |
verbose : bool | |
Set verbose output to true or false. Defaults to true. | |
""" | |
sfreq = raw.info['sfreq'] | |
powerlinefrequency = raw.info.get('line_freq', None) | |
if powerlinefrequency is None: | |
warn('No line frequency found, defaulting to 50 Hz') | |
powerlinefrequency = 50 | |
if isinstance(raw, BaseRaw): | |
rec_type = 'continuous' | |
elif isinstance(raw, Epochs): | |
rec_type = 'epoched' | |
else: | |
rec_type = 'n/a' | |
# determine whether any channels have to be ignored: | |
n_ignored = len([ch_name for ch_name in | |
IGNORED_CHANNELS.get(manufacturer, list()) if | |
ch_name in raw.ch_names]) | |
# all ignored channels are trigger channels at the moment... | |
n_megchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_MEG_CH]) | |
n_megrefchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_REF_MEG_CH]) | |
n_eegchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_EEG_CH]) | |
n_ecogchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_ECOG_CH]) | |
n_seegchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_SEEG_CH]) | |
n_eogchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_EOG_CH]) | |
n_ecgchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_ECG_CH]) | |
n_emgchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_EMG_CH]) | |
n_miscchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_MISC_CH]) | |
n_stimchan = len([ch for ch in raw.info['chs'] | |
if ch['kind'] == FIFF.FIFFV_STIM_CH]) - n_ignored | |
# Define modality-specific JSON dictionaries | |
ch_info_json_common = [ | |
('TaskName', task), | |
('Manufacturer', manufacturer), | |
('PowerLineFrequency', powerlinefrequency), | |
('SamplingFrequency', sfreq), | |
('SoftwareFilters', 'n/a'), | |
('RecordingDuration', raw.times[-1]), | |
('RecordingType', rec_type)] | |
ch_info_json_meg = [ | |
('DewarPosition', 'n/a'), | |
('DigitizedLandmarks', False), | |
('DigitizedHeadPoints', False), | |
('MEGChannelCount', n_megchan), | |
('MEGREFChannelCount', n_megrefchan)] | |
ch_info_json_eeg = [ | |
('EEGReference', 'n/a'), | |
('EEGGround', 'n/a'), | |
('EEGPlacementScheme', _infer_eeg_placement_scheme(raw)), | |
('Manufacturer', manufacturer)] | |
ch_info_json_ieeg = [ | |
('iEEGReference', 'n/a'), | |
('ECOGChannelCount', n_ecogchan), | |
('SEEGChannelCount', n_seegchan)] | |
ch_info_ch_counts = [ | |
('EEGChannelCount', n_eegchan), | |
('EOGChannelCount', n_eogchan), | |
('ECGChannelCount', n_ecgchan), | |
('EMGChannelCount', n_emgchan), | |
('MiscChannelCount', n_miscchan), | |
('TriggerChannelCount', n_stimchan)] | |
# Stitch together the complete JSON dictionary | |
ch_info_json = ch_info_json_common | |
if kind == 'meg': | |
append_kind_json = ch_info_json_meg | |
elif kind == 'eeg': | |
append_kind_json = ch_info_json_eeg | |
elif kind == 'ieeg': | |
append_kind_json = ch_info_json_ieeg | |
ch_info_json += append_kind_json | |
ch_info_json += ch_info_ch_counts | |
ch_info_json = OrderedDict(ch_info_json) | |
_write_json(fname, ch_info_json, overwrite, verbose) | |
return fname | |
def _deface(t1w, mri_landmarks, deface, trans, raw): | |
if not has_nibabel(): # pragma: no cover | |
raise ImportError('This function requires nibabel.') | |
import nibabel as nib | |
inset, theta = (20, 35.) | |
if isinstance(deface, dict): | |
if 'inset' in deface: | |
inset = deface['inset'] | |
if 'theta' in deface: | |
theta = deface['theta'] | |
if not _is_numeric(inset): | |
raise ValueError('inset must be numeric (float, int). ' | |
'Got %s' % type(inset)) | |
if not _is_numeric(theta): | |
raise ValueError('theta must be numeric (float, int). ' | |
'Got %s' % type(theta)) | |
if inset < 0: | |
raise ValueError('inset should be positive, ' | |
'Got %s' % inset) | |
if not 0 < theta < 90: | |
raise ValueError('theta should be between 0 and 90 ' | |
'degrees. Got %s' % theta) | |
# x: L/R L+, y: S/I I+, z: A/P A+ | |
t1w_data = t1w.get_data().copy() | |
idxs_vox = np.meshgrid(np.arange(t1w_data.shape[0]), | |
np.arange(t1w_data.shape[1]), | |
np.arange(t1w_data.shape[2]), | |
indexing='ij') | |
idxs_vox = np.array(idxs_vox) # (3, *t1w_data.shape) | |
idxs_vox = np.transpose(idxs_vox, | |
[1, 2, 3, 0]) # (*t1w_data.shape, 3) | |
idxs_vox = idxs_vox.reshape(-1, 3) # (n_voxels, 3) | |
mri_landmarks_ras = apply_trans(t1w.affine, mri_landmarks) | |
ras_meg_t = \ | |
get_ras_to_neuromag_trans(*mri_landmarks_ras[[1, 0, 2]]) | |
idxs_ras = apply_trans(t1w.affine, idxs_vox) | |
idxs_meg = apply_trans(ras_meg_t, idxs_ras) | |
# now comes the actual defacing | |
# 1. move center of voxels to (nasion - inset) | |
# 2. rotate the head by theta from the normal to the plane passing | |
# through anatomical coordinates | |
trans_y = -mri_landmarks_ras[1, 1] + inset | |
idxs_meg = apply_trans(translation(y=trans_y), idxs_meg) | |
idxs_meg = apply_trans(rotation(x=-np.deg2rad(theta)), idxs_meg) | |
coords = idxs_meg.reshape(t1w.shape + (3,)) # (*t1w_data.shape, 3) | |
mask = (coords[..., 2] < 0) # z < 0 | |
t1w_data[mask] = 0. | |
# smooth decided against for potential lack of anonymizaton | |
# https://gist.github.com/alexrockhill/15043928b716a432db3a84a050b241ae | |
t1w = nib.Nifti1Image(t1w_data, t1w.affine, t1w.header) | |
return t1w | |
def _write_raw_fif(raw, bids_fname): | |
"""Save out the raw file in FIF. | |
Parameters | |
---------- | |
raw : mne.io.Raw | |
Raw file to save out. | |
bids_fname : str | |
The name of the BIDS-specified file where the raw object | |
should be saved. | |
""" | |
n_rawfiles = len(raw.filenames) | |
if n_rawfiles > 1: | |
split_naming = 'bids' | |
raw.save(bids_fname, split_naming=split_naming, overwrite=True) | |
else: | |
# This ensures that single FIF files do not have the part param | |
raw.save(bids_fname, split_naming='neuromag', overwrite=True) | |
def _write_raw_brainvision(raw, bids_fname): | |
"""Save out the raw file in BrainVision format. | |
Parameters | |
---------- | |
raw : mne.io.Raw | |
Raw file to save out. | |
bids_fname : str | |
The name of the BIDS-specified file where the raw object | |
should be saved. | |
""" | |
if not check_version('pybv', '0.2'): | |
raise ImportError('pybv >=0.2.0 is required for converting ' + | |
'file to Brainvision format') | |
from pybv import write_brainvision | |
events, event_id = events_from_annotations(raw) | |
write_brainvision(raw.get_data(), raw.info['sfreq'], | |
raw.ch_names, | |
op.splitext(op.basename(bids_fname))[0], | |
op.dirname(bids_fname), events[:, [0, 2]], | |
resolution=1e-6) | |
def make_bids_basename(subject=None, session=None, task=None, | |
acquisition=None, run=None, processing=None, | |
recording=None, space=None, prefix=None, suffix=None): | |
"""Create a partial/full BIDS filename from its component parts. | |
BIDS filename prefixes have one or more pieces of metadata in them. They | |
must follow a particular order, which is followed by this function. This | |
will generate the *prefix* for a BIDS filename that can be used with many | |
subsequent files, or you may also give a suffix that will then complete | |
the file name. | |
Note that all parameters are not applicable to each kind of data. For | |
example, electrode location TSV files do not need a task field. | |
Parameters | |
---------- | |
subject : str | None | |
The subject ID. Corresponds to "sub". | |
session : str | None | |
The session for a item. Corresponds to "ses". | |
task : str | None | |
The task for a item. Corresponds to "task". | |
acquisition: str | None | |
The acquisition parameters for the item. Corresponds to "acq". | |
run : int | None | |
The run number for this item. Corresponds to "run". | |
processing : str | None | |
The processing label for this item. Corresponds to "proc". | |
recording : str | None | |
The recording name for this item. Corresponds to "recording". | |
space : str | None | |
The coordinate space for an anatomical file. Corresponds to "space". | |
prefix : str | None | |
The prefix for the filename to be created. E.g., a path to the folder | |
in which you wish to create a file with this name. | |
suffix : str | None | |
The suffix for the filename to be created. E.g., 'audio.wav'. | |
Returns | |
------- | |
filename : str | |
The BIDS filename you wish to create. | |
Examples | |
-------- | |
>>> print(make_bids_basename(subject='test', session='two', task='mytask', suffix='data.csv')) # noqa: E501 | |
sub-test_ses-two_task-mytask_data.csv | |
""" | |
order = OrderedDict([('sub', subject), | |
('ses', session), | |
('task', task), | |
('acq', acquisition), | |
('run', run), | |
('proc', processing), | |
('space', space), | |
('recording', recording)]) | |
if order['run'] is not None and not isinstance(order['run'], str): | |
# Ensure that run is a string | |
order['run'] = '{:02}'.format(order['run']) | |
_check_types(order.values()) | |
if (all(ii is None for ii in order.values()) and suffix is None and | |
prefix is None): | |
raise ValueError("At least one parameter must be given.") | |
filename = [] | |
for key, val in order.items(): | |
if val is not None: | |
_check_key_val(key, val) | |
filename.append('%s-%s' % (key, val)) | |
if isinstance(suffix, str): | |
filename.append(suffix) | |
filename = '_'.join(filename) | |
if isinstance(prefix, str): | |
filename = op.join(prefix, filename) | |
return filename | |
def make_bids_folders(subject, session=None, kind=None, output_path=None, | |
make_dir=True, overwrite=False, verbose=False): | |
"""Create a BIDS folder hierarchy. | |
This creates a hierarchy of folders *within* a BIDS dataset. You should | |
plan to create these folders *inside* the output_path folder of the dataset. | |
Parameters | |
---------- | |
subject : str | |
The subject ID. Corresponds to "sub". | |
kind : str | |
The kind of folder being created at the end of the hierarchy. E.g., | |
"anat", "func", etc. | |
session : str | None | |
The session for a item. Corresponds to "ses". | |
output_path : str | None | |
The output_path for the folders to be created. If None, folders will be | |
created in the current working directory. | |
make_dir : bool | |
Whether to actually create the folders specified. If False, only a | |
path will be generated but no folders will be created. | |
overwrite : bool | |
How to handle overwriting previously generated data. | |
If overwrite == False then no existing folders will be removed, however | |
if overwrite == True then any existing folders at the session level | |
or lower will be removed, including any contained data. | |
verbose : bool | |
If verbose is True, print status updates | |
as folders are created. | |
Returns | |
------- | |
path : str | |
The (relative) path to the folder that was created. | |
Examples | |
-------- | |
>>> print(make_bids_folders('sub_01', session='my_session', | |
kind='meg', output_path='path/to/project', | |
make_dir=False)) # noqa | |
path/to/project/sub-sub_01/ses-my_session/meg | |
""" | |
_check_types((subject, kind, session, output_path)) | |
if session is not None: | |
_check_key_val('ses', session) | |
path = ['sub-%s' % subject] | |
if isinstance(session, str): | |
path.append('ses-%s' % session) | |
if isinstance(kind, str): | |
path.append(kind) | |
path = op.join(*path) | |
if isinstance(output_path, str): | |
path = op.join(output_path, path) | |
if make_dir is True: | |
_mkdir_p(path, overwrite=overwrite, verbose=verbose) | |
return path | |
def make_dataset_description(path, name=None, data_license=None, | |
authors=None, acknowledgements=None, | |
how_to_acknowledge=None, funding=None, | |
references_and_links=None, doi=None, | |
verbose=False): | |
"""Create json for a dataset description. | |
BIDS datasets may have one or more fields, this function allows you to | |
specify which you wish to include in the description. See the BIDS | |
documentation for information about what each field means. | |
Parameters | |
---------- | |
path : str | |
A path to a folder where the description will be created. | |
name : str | None | |
The name of this BIDS dataset. | |
data_license : str | None | |
The license under which this datset is published. | |
authors : list | str | None | |
List of individuals who contributed to the creation/curation of the | |
dataset. Must be a list of strings or a single comma separated string | |
like ['a', 'b', 'c']. | |
acknowledgements : list | str | None | |
Either a str acknowledging individuals who contributed to the | |
creation/curation of this dataset OR a list of the individuals' | |
names as str. | |
how_to_acknowledge : list | str | None | |
Either a str describing how to acknowledge this dataset OR a list of | |
publications that should be cited. | |
funding : list | str | None | |
List of sources of funding (e.g., grant numbers). Must be a list of | |
strings or a single comma separated string like ['a', 'b', 'c']. | |
references_and_links : list | str | None | |
List of references to publication that contain information on the | |
dataset, or links. Must be a list of strings or a single comma | |
separated string like ['a', 'b', 'c']. | |
doi : str | None | |
The DOI for the dataset. | |
Notes | |
----- | |
The required field BIDSVersion will be automatically filled by mne_bids. | |
""" | |
# Put potential string input into list of strings | |
if isinstance(authors, str): | |
authors = authors.split(', ') | |
if isinstance(funding, str): | |
funding = funding.split(', ') | |
if isinstance(references_and_links, str): | |
references_and_links = references_and_links.split(', ') | |
fname = op.join(path, 'dataset_description.json') | |
description = OrderedDict([('Name', name), | |
('BIDSVersion', BIDS_VERSION), | |
('License', data_license), | |
('Authors', authors), | |
('Acknowledgements', acknowledgements), | |
('HowToAcknowledge', how_to_acknowledge), | |
('Funding', funding), | |
('ReferencesAndLinks', references_and_links), | |
('DatasetDOI', doi)]) | |
pop_keys = [key for key, val in description.items() if val is None] | |
for key in pop_keys: | |
description.pop(key) | |
_write_json(fname, description, overwrite=True, verbose=verbose) | |
def write_raw_bids(raw, bids_basename, output_path, events_data=None, | |
event_id=None, anonymize=None, convert=None, | |
overwrite=False, verbose=True): | |
"""Walk over a folder of files and create BIDS compatible folder. | |
.. warning:: The original files are simply copied over if the original | |
file format is BIDS-supported for that modality. Otherwise, | |
this function will convert to a BIDS-supported file format | |
while warning the user. For EEG and iEEG data, conversion will | |
be to BrainVision format, for MEG conversion will be to FIF. | |
Parameters | |
---------- | |
raw : instance of mne.io.Raw | |
The raw data. It must be an instance of mne.Raw. The data should not be | |
loaded on disk, i.e., raw.preload must be False. | |
bids_basename : str | |
The base filename of the BIDS compatible files. Typically, this can be | |
generated using make_bids_basename. | |
Example: `sub-01_ses-01_task-testing_acq-01_run-01`. | |
This will write the following files in the correct subfolder of the | |
output_path:: | |
sub-01_ses-01_task-testing_acq-01_run-01_meg.fif | |
sub-01_ses-01_task-testing_acq-01_run-01_meg.json | |
sub-01_ses-01_task-testing_acq-01_run-01_channels.tsv | |
sub-01_ses-01_task-testing_acq-01_run-01_coordsystem.json | |
and the following one if events_data is not None:: | |
sub-01_ses-01_task-testing_acq-01_run-01_events.tsv | |
and add a line to the following files:: | |
participants.tsv | |
scans.tsv | |
Note that the modality 'meg' is automatically inferred from the raw | |
object and extension '.fif' is copied from raw.filenames. | |
output_path : str | |
The path of the root of the BIDS compatible folder. The session and | |
subject specific folders will be populated automatically by parsing | |
bids_basename. | |
events_data : str | array | None | |
The events file. If a string, a path to the events file. If an array, | |
the MNE events array (shape n_events, 3). If None, events will be | |
inferred from the stim channel using `mne.find_events`. | |
event_id : dict | None | |
The event id dict used to create a 'trial_type' column in events.tsv | |
anonymize : dict | None | |
If a dictionary is passed, data will be anonymized; identifying data | |
structures such as study date and time will be changed. | |
`daysback` is a required argument and `keep_his` is optional, these | |
arguments are passed to :func:`mne.io.anonymize_info`. | |
`daysback` : int | |
Number of days to move back the date. To keep relative dates | |
for a subject use the same `daysback`. According to BIDS | |
specificiations, the number of days back must be great enough | |
that the date is before 1925 thus `daysback` will be subtracted | |
from Dec 31, 1924. | |
`keep_his` : bool | |
If True his_id of subject_info will NOT be overwritten. | |
Defaults to False. | |
convert : bool | None | |
Whether or not to save in BrainVision for EEG or iEEG data and FIF | |
for MEG data if the data file is not already in that format. | |
Defaults to None in which case conversion will be done only if | |
the file type is not compatible (e.g. an EEG only FIF file). If | |
anonymize is used, the file must be converted. | |
overwrite : bool | |
Whether to overwrite existing files or data in files. | |
Defaults to False. | |
If overwrite is True, any existing files with the same BIDS parameters | |
will be overwritten with the exception of the `participants.tsv` and | |
`scans.tsv` files. For these files, parts of pre-existing data that | |
match the current data will be replaced. | |
If overwrite is False, no existing data will be overwritten or | |
replaced. | |
verbose : bool | |
If verbose is True, this will print a snippet of the sidecar files. If | |
False, no content will be printed. | |
Returns | |
------- | |
output_path : str | |
The path of the root of the BIDS compatible folder. | |
Notes | |
----- | |
For the participants.tsv file, the raw.info['subjects_info'] should be | |
updated and raw.info['meas_date'] should not be None to compute the age | |
of the participant correctly. | |
""" | |
if not check_version('mne', '0.17'): | |
raise ValueError('Your version of MNE is too old. ' | |
'Please update to 0.17 or newer.') | |
if not isinstance(raw, BaseRaw): | |
raise ValueError('raw_file must be an instance of BaseRaw, ' | |
'got %s' % type(raw)) | |
if not hasattr(raw, 'filenames') or raw.filenames[0] is None: | |
raise ValueError('raw.filenames is missing. Please set raw.filenames' | |
'as a list with the full path of original raw file.') | |
if raw.preload is not False: | |
raise ValueError('The data should not be preloaded.') | |
raw = raw.copy() | |
raw_fname = raw.filenames[0] | |
if '.ds' in op.dirname(raw.filenames[0]): | |
raw_fname = op.dirname(raw.filenames[0]) | |
# point to file containing header info for multifile systems | |
raw_fname = raw_fname.replace('.eeg', '.vhdr') | |
raw_fname = raw_fname.replace('.fdt', '.set') | |
_, ext = _parse_ext(raw_fname, verbose=verbose) | |
raw_orig = reader[ext](**raw._init_kwargs) | |
assert_array_equal(raw.times, raw_orig.times, | |
"raw.times should not have changed since reading" | |
" in from the file. It may have been cropped.") | |
params = _parse_bids_filename(bids_basename, verbose) | |
subject_id, session_id = params['sub'], params['ses'] | |
acquisition, task, run = params['acq'], params['task'], params['run'] | |
kind = _handle_kind(raw) | |
bids_fname = bids_basename + '_%s%s' % (kind, ext) | |
# check whether the info provided indicates that the data is emptyroom | |
# data | |
emptyroom = False | |
if subject_id == 'emptyroom' and task == 'noise': | |
emptyroom = True | |
# check the session date provided is consistent with the value in raw | |
meas_date = raw.info.get('meas_date', None) | |
if meas_date is not None: | |
er_date = datetime.fromtimestamp( | |
raw.info['meas_date'][0]).strftime('%Y%m%d') | |
if er_date != session_id: | |
raise ValueError("Date provided for session doesn't match " | |
"session date.") | |
data_path = make_bids_folders(subject=subject_id, session=session_id, | |
kind=kind, output_path=output_path, | |
overwrite=False, verbose=verbose) | |
if session_id is None: | |
ses_path = os.sep.join(data_path.split(os.sep)[:-1]) | |
else: | |
ses_path = make_bids_folders(subject=subject_id, session=session_id, | |
output_path=output_path, make_dir=False, | |
overwrite=False, verbose=verbose) | |
# create filenames | |
scans_fname = make_bids_basename( | |
subject=subject_id, session=session_id, suffix='scans.tsv', | |
prefix=ses_path) | |
participants_tsv_fname = make_bids_basename(prefix=output_path, | |
suffix='participants.tsv') | |
participants_json_fname = make_bids_basename(prefix=output_path, | |
suffix='participants.json') | |
coordsystem_fname = make_bids_basename( | |
subject=subject_id, session=session_id, acquisition=acquisition, | |
suffix='coordsystem.json', prefix=data_path) | |
sidecar_fname = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, run=run, | |
acquisition=acquisition, suffix='%s.json' % kind, prefix=data_path) | |
events_fname = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, | |
acquisition=acquisition, run=run, suffix='events.tsv', | |
prefix=data_path) | |
channels_fname = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, run=run, | |
acquisition=acquisition, suffix='channels.tsv', prefix=data_path) | |
if ext not in ['.fif', '.ds', '.vhdr', '.edf', '.bdf', '.set', '.con', | |
'.sqd']: | |
bids_raw_folder = bids_fname.split('.')[0] | |
bids_fname = op.join(bids_raw_folder, bids_fname) | |
# Anonymize | |
if anonymize: | |
if 'daysback' not in anonymize: | |
raise ValueError('`daysback` argument required to anonymize.') | |
daysback = anonymize['daysback'] | |
if daysback < 0: | |
raise ValueError('`daysback` must be a positive number') | |
# this is for ctf and set which have meas_date = None in testing data | |
if raw.info['meas_date'] is None: | |
raw.info['meas_date'] = (np.int32(0), np.int32(0)) | |
min_secondsback = (_stamp_to_dt(raw.info['meas_date']).date() - | |
date(year=1924, month=12, day=31)).total_seconds() | |
# 86400 == seconds in a day | |
min_daysback = np.ceil(min_secondsback / 86400) | |
daysback += min_daysback | |
new_date = (_stamp_to_dt(raw.info['meas_date']).date() - | |
timedelta(days=daysback)) | |
seconds_from_0 = (datetime.fromtimestamp(0).date() - new_date | |
).total_seconds() | |
if abs(seconds_from_0) > np.iinfo('>i4').max: | |
max_val = np.ceil((int(raw.info['meas_date'][0]) - | |
np.iinfo('>i4').min) / 86400) - min_daysback | |
raise ValueError('`daysback` is too large, maximum value: %i' % | |
max_val) | |
keep_his = anonymize['keep_his'] if 'keep_his' in anonymize else False | |
raw.info = anonymize_info(raw.info, daysback=daysback, | |
keep_his=keep_his) | |
if kind == 'meg' and ext != 'fif': | |
if verbose: | |
warn('Converting to FIF for anonymization') | |
bids_fname = bids_fname.replace(ext, '.fif') | |
# Read in Raw object and extract metadata from Raw object if needed | |
orient = ORIENTATION.get(ext, 'n/a') | |
unit = UNITS.get(ext, 'n/a') | |
manufacturer = MANUFACTURERS.get(ext, 'n/a') | |
# save all meta data | |
_participants_tsv(raw, subject_id, participants_tsv_fname, overwrite, | |
verbose) | |
_participants_json(participants_json_fname, True, verbose) | |
_scans_tsv(raw, op.join(kind, bids_fname), scans_fname, overwrite, verbose) | |
# TODO: Implement coordystem.json and electrodes.tsv for EEG and iEEG | |
if kind == 'meg' and not emptyroom: | |
_coordsystem_json(raw, unit, orient, manufacturer, coordsystem_fname, | |
overwrite, verbose) | |
events, event_id = _read_events(events_data, event_id, raw, ext) | |
if events is not None and len(events) > 0 and not emptyroom: | |
_events_tsv(events, raw, events_fname, event_id, overwrite, verbose) | |
make_dataset_description(output_path, name=" ", verbose=verbose) | |
_sidecar_json(raw, task, manufacturer, sidecar_fname, kind, overwrite, | |
verbose) | |
_channels_tsv(raw, channels_fname, overwrite, verbose) | |
# set the raw file name to now be the absolute path to ensure the files | |
# are placed in the right location | |
bids_fname = op.join(data_path, bids_fname) | |
if os.path.exists(bids_fname) and not overwrite: | |
raise FileExistsError('"%s" already exists. Please set ' # noqa: F821 | |
'overwrite to True.' % bids_fname) | |
_mkdir_p(os.path.dirname(bids_fname)) | |
if convert is None: | |
convert = ext not in ALLOWED_EXTENSIONS[kind] | |
if not (convert or anonymize) and verbose: | |
print('Copying data files to %s' % op.splitext(bids_fname)[0]) | |
# File saving branching logic | |
if convert or anonymize: | |
if kind == 'meg': | |
if ext == '.pdf': | |
bids_fname = op.join(data_path, op.basename(bids_fname)) | |
if anonymize: | |
# delete to avoid 32-bit errors | |
# https://github.com/bids-standard/bids-specification/issues/360 | |
raw.info['meas_date'] = (np.int32(0), np.int32(0)) | |
_write_raw_fif(raw, bids_fname) | |
else: | |
if verbose: | |
warn('Converting data files to BrainVision format') | |
_write_raw_brainvision(raw, bids_fname) | |
elif ext == '.fif': | |
_write_raw_fif(raw, bids_fname) | |
# CTF data is saved and renamed in a directory | |
elif ext == '.ds': | |
copyfile_ctf(raw_fname, bids_fname) | |
# BrainVision is multifile, copy over all of them and fix pointers | |
elif ext == '.vhdr': | |
copyfile_brainvision(raw_fname, bids_fname) | |
# EEGLAB .set might be accompanied by a .fdt - find out and copy it too | |
elif ext == '.set': | |
copyfile_eeglab(raw_fname, bids_fname) | |
elif ext == '.pdf': | |
copyfile_bti(raw_orig, op.join(data_path, bids_raw_folder)) | |
else: | |
sh.copyfile(raw_fname, bids_fname) | |
# KIT data requires the marker file to be copied over too | |
if 'mrk' in raw._init_kwargs: | |
hpi = raw._init_kwargs['mrk'] | |
acq_map = dict() | |
if isinstance(hpi, list): | |
if _get_mrk_meas_date(hpi[0]) > _get_mrk_meas_date(hpi[1]): | |
raise ValueError('Markers provided in incorrect order.') | |
_, marker_ext = _parse_ext(hpi[0]) | |
acq_map = dict(zip(['pre', 'post'], hpi)) | |
else: | |
_, marker_ext = _parse_ext(hpi) | |
acq_map[None] = hpi | |
for key, value in acq_map.items(): | |
marker_fname = make_bids_basename( | |
subject=subject_id, session=session_id, task=task, run=run, | |
acquisition=key, suffix='markers%s' % marker_ext, | |
prefix=data_path) | |
sh.copyfile(value, marker_fname) | |
return output_path | |
def write_anat(bids_root, subject, t1w, session=None, acquisition=None, | |
raw=None, trans=None, deface=False, overwrite=False, | |
verbose=False): | |
"""Put anatomical MRI data into a BIDS format. | |
Given a BIDS directory and a T1 weighted MRI scan for a certain subject, | |
format the MRI scan to be in BIDS format and put it into the correct | |
location in the bids_dir. If a transformation matrix is supplied, a | |
sidecar JSON file will be written for the T1 weighted data. | |
Parameters | |
---------- | |
bids_root : str | |
Path to root of the BIDS folder | |
subject : str | |
Subject label as in 'sub-<label>', for example: '01' | |
t1w : str | nibabel image object | |
Path to a T1 weighted MRI scan of the subject. Can be in any format | |
readable by nibabel. Can also be a nibabel image object of a T1 | |
weighted MRI scan. Will be written as a .nii.gz file. | |
session : str | None | |
The session for `t1w`. Corresponds to "ses" | |
acquisition: str | None | |
The acquisition parameters for `t1w`. Corresponds to "acq" | |
raw : instance of Raw | None | |
The raw data of `subject` corresponding to `t1w`. If `raw` is None, | |
`trans` has to be None as well | |
trans : instance of mne.transforms.Transform | str | None | |
The transformation matrix from head coordinates to MRI coordinates. Can | |
also be a string pointing to a .trans file containing the | |
transformation matrix. If None, no sidecar JSON file will be written | |
for `t1w` | |
deface : bool | dict | |
If False, no defacing is performed. | |
If True, deface with default parameters. | |
`trans` and `raw` must not be `None` if True. | |
If dict, accepts the following keys: | |
`inset`: how far back in millimeters to start defacing | |
relative to the nasion (default 20) | |
`theta`: is the angle of the defacing shear in degrees relative | |
to the normal to the plane passing through the anatomical | |
landmarks (default 35). | |
overwrite : bool | |
Whether to overwrite existing files or data in files. | |
Defaults to False. | |
If overwrite is True, any existing files with the same BIDS parameters | |
will be overwritten with the exception of the `participants.tsv` and | |
`scans.tsv` files. For these files, parts of pre-existing data that | |
match the current data will be replaced. | |
If overwrite is False, no existing data will be overwritten or | |
replaced. | |
verbose : bool | |
If verbose is True, this will print a snippet of the sidecar files. If | |
False, no content will be printed. | |
Returns | |
------- | |
anat_dir : str | |
Path to the anatomical scan in the `bids_dir` | |
""" | |
if not has_nibabel(): # pragma: no cover | |
raise ImportError('This function requires nibabel.') | |
import nibabel as nib | |
if deface and (trans is None or raw is None): | |
raise ValueError('The raw object, trans and raw must be provided to ' | |
'deface the T1') | |
# Make directory for anatomical data | |
anat_dir = op.join(bids_root, 'sub-{}'.format(subject)) | |
# Session is optional | |
if session is not None: | |
anat_dir = op.join(anat_dir, 'ses-{}'.format(session)) | |
anat_dir = op.join(anat_dir, 'anat') | |
if not op.exists(anat_dir): | |
os.makedirs(anat_dir) | |
# Try to read our T1 file and convert to MGH representation | |
if isinstance(t1w, str): | |
t1w = nib.load(t1w) | |
elif type(t1w) not in nib.all_image_classes: | |
raise ValueError('`t1w` must be a path to a T1 weighted MRI data file ' | |
', or a nibabel image object, but it is of type ' | |
'"{}"'.format(type(t1w))) | |
t1w = nib.Nifti1Image(t1w.dataobj, t1w.affine) | |
# XYZT_UNITS = NIFT_UNITS_MM (10 in binary or 2 in decimal) | |
# seems to be the default for Nifti files | |
# https://nifti.nimh.nih.gov/nifti-1/documentation/nifti1fields/nifti1fields_pages/xyzt_units.html | |
if t1w.header['xyzt_units'] == 0: | |
t1w.header['xyzt_units'] = np.array(10, dtype='uint8') | |
# Now give the NIfTI file a BIDS name and write it to the BIDS location | |
t1w_basename = make_bids_basename(subject=subject, session=session, | |
acquisition=acquisition, prefix=anat_dir, | |
suffix='T1w.nii.gz') | |
# Check if we have necessary conditions for writing a sidecar JSON | |
if trans is not None: | |
# get trans and ensure it is from head to MRI | |
trans, _ = _get_trans(trans, fro='head', to='mri') | |
if not isinstance(raw, BaseRaw): | |
raise ValueError('`raw` must be specified if `trans` is not None') | |
# Prepare to write the sidecar JSON | |
# extract MEG landmarks | |
coords_dict = _extract_landmarks(raw.info['dig']) | |
meg_landmarks = np.asarray((coords_dict['LPA'], | |
coords_dict['NAS'], | |
coords_dict['RPA'])) | |
# Transform MEG landmarks into MRI space, adjust units by * 1e3 | |
mri_landmarks = apply_trans(trans, meg_landmarks, move=True) * 1e3 | |
# Get landmarks in voxel space, using the mgh version of our T1 data | |
t1_mgh = nib.MGHImage(t1w.dataobj, t1w.affine) | |
vox2ras_tkr = t1_mgh.header.get_vox2ras_tkr() | |
ras2vox_tkr = np.linalg.inv(vox2ras_tkr) | |
mri_landmarks = apply_trans(ras2vox_tkr, mri_landmarks) # in vox | |
# Write sidecar.json | |
t1w_json = dict() | |
t1w_json['AnatomicalLandmarkCoordinates'] = \ | |
{'LPA': list(mri_landmarks[0, :]), | |
'NAS': list(mri_landmarks[1, :]), | |
'RPA': list(mri_landmarks[2, :])} | |
fname = t1w_basename.replace('.nii.gz', '.json') | |
if op.isfile(fname) and not overwrite: | |
raise IOError('Wanted to write a file but it already exists and ' | |
'`overwrite` is set to False. File: "{}"' | |
.format(fname)) | |
_write_json(fname, t1w_json, overwrite, verbose) | |
if deface: | |
t1w = _deface(t1w, mri_landmarks, deface, trans, raw) | |
# Save anatomical data | |
if op.exists(t1w_basename): | |
if overwrite: | |
os.remove(t1w_basename) | |
else: | |
raise IOError('Wanted to write a file but it already exists and ' | |
'`overwrite` is set to False. File: "{}"' | |
.format(t1w_basename)) | |
nib.save(t1w, t1w_basename) | |
return anat_dir |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment