Source code for wonambi.ioeeg.bids

from json import dump
from numpy import array

from .brainvision import write_brainvision
from .edf import Edf
from ..utils import MissingDependency

try:
    from bidso import iEEG
    from bidso.utils import replace_extension, replace_underscore
except ImportError as err:
    iEEG = replace_extension = MissingDependency(err)


[docs]class BIDS: """Basic class to read the data. Parameters ---------- filename : path to file the name of the filename or directory """ def __init__(self, filename): from ..dataset import Dataset self.filename = filename self.task = iEEG(filename) self.baseformat = Dataset(filename)
[docs] def return_hdr(self, sf_from_bids=False): """Return the header for further use. Returns ------- subj_id : str subject identification code start_time : datetime start time of the dataset s_freq : float sampling frequency chan_name : list of str list of all the channels n_samples : int number of samples in the dataset orig : dict additional information taken directly from the header """ subj_id = self.task.subject chan_name = self.task.channels.get(map_lambda=lambda x: x['name']) self.chan_name = array(chan_name) # read these values directly from dataset orig = self.baseformat.header start_time = orig['start_time'] n_samples = orig['n_samples'] # s_freq is present in the bids twice (json and channels.tsv) and in the header file, however I trust the header of the file the most if sf_from_bids: sampling_freq = set(self.task.channels.get(map_lambda=lambda x: x['sampling_frequency'])) if len(sampling_freq) > 1: raise ValueError('Multiple sampling frequencies not supported') s_freq = float(next(iter(sampling_freq))) else: s_freq = orig['s_freq'] return subj_id, start_time, s_freq, chan_name, n_samples, orig
[docs] def return_dat(self, chan, begsam, endsam): """Return the data as 2D numpy.ndarray. Parameters ---------- chan : int or list index (indices) of the channels to read begsam : int index of the first sample endsam : int index of the last sample Returns ------- numpy.ndarray A 2d matrix, with dimension chan X samples """ return self.baseformat.dataset.return_dat(chan, begsam, endsam)
[docs] def return_markers(self): """Return all the markers (also called triggers or events). Returns ------- list of dict where each dict contains 'name' as str, 'start' and 'end' as float in seconds from the start of the recordings, and 'chan' as list of str with the channels involved (if not of relevance, it's None). """ markers = [] for mrk in self.task.events.tsv: markers.append({ 'start': float(mrk['onset']), 'end': float(mrk['onset']) + float(mrk['duration']), 'name': mrk['trial_type'] }) return markers
[docs]def write_bids(data, filename, markers=[]): write_brainvision(data, filename, markers) _write_ieeg_json( replace_extension(filename, '.json')) _write_ieeg_channels( replace_underscore(filename, 'channels.tsv'), data) _write_ieeg_events( replace_underscore(filename, 'events.tsv'), markers)
def _write_ieeg_json(output_file): """Use only required fields """ dataset_info = { "TaskName": "unknown", "Manufacturer": "n/a", "PowerLineFrequency": 50, "iEEGReference": "n/a", } with output_file.open('w') as f: dump(dataset_info, f, indent=' ') def _write_ieeg_channels(output_file, data): """ TODO ---- Make sure that the channels in all the trials are the same. """ CHAN_TYPE = 'ECOG' CHAN_UNIT = 'µV' with output_file.open('w') as f: f.write('name\ttype\tunits\tsampling_frequency\tlow_cutoff\thigh_cutoff\tnotch\treference\n') for one_chan in data.chan[0]: f.write('\t'.join([ one_chan, CHAN_TYPE, CHAN_UNIT, f'{data.s_freq:f}', 'n/a', 'n/a', 'n/a', 'n/a', ]) + '\n') def _write_ieeg_events(output_file, markers): with output_file.open('w') as f: f.write('onset\tduration\ttrial_type\n') for mrk in markers: onset = mrk['start'] duration = mrk['end'] - mrk['start'] f.write(f'{onset:f}\t{duration:f}\t{mrk["name"]}\n')
[docs]def write_bids_channels(output_file, dataset): """Export BIDS channels TSV from Dataset. Parameters ---------- output_file : path to file file to export to (use '.tsv' as extension) dataset : instance of wonambi.Dataset Dataset with record metadata """ if dataset.IOClass is Edf: hdr = dataset.header['orig'] channels = hdr['label'] units = [x if x.encode('utf-8') != b'\xef\xbf\xbd' else '?' \ for x in hdr['physical_dim']] low_cut = [x[x.index('HP:') + 3:x.index('Hz')] \ if 'HP:' in x else '0' for x in hdr['prefiltering']] high_cut = [x[x.index('LP:') + 3:x[x.index('LP:'):].index('Hz') \ + x.index('LP:')] \ if 'LP:' in x else 'Inf' for x in hdr['prefiltering']] notch = [x[x.index('N:') + 2:-2] \ if 'N:' in x else 'n/a' for x in hdr['prefiltering']] s_freq = [x / hdr['record_length'] \ for x in hdr['n_samples_per_record']] chan_type = [] for one_chan in channels: ch = one_chan.lower() if 'eog' in ch or ch == 'e1' or ch == 'e2': chan_type.append('EOG') elif any(x in ch for x in ['ecg', 'ekg']): chan_type.append('ECG') elif any(x in ch for x in ['emg', 'chin', 'leg']): chan_type.append('EMG') elif (ch[-1].isdigit() and ch[:2] != 'sp') or ch[-1] == 'z': # not a perfect test #print(f'yessir, {ch} fits the bill alright!') chan_type.append('EEG') else: chan_type.append('MISC') with output_file.open('w') as f: f.write('name\ttype\tunits\tsampling_frequency\tlow_cutoff' '\thigh_cutoff\tnotch\treference\n') for i, one_chan in enumerate(channels): f.write('\t'.join([ one_chan, chan_type[i], units[i], f'{s_freq[i]:f}', low_cut[i], high_cut[i], notch[i], 'n/a', ]) + '\n') else: print(str(dataset.IOClass) + ' not currently supported.')