Source code for wonambi.ioeeg.eeglab

from datetime import datetime
from numpy import c_, NaN, empty, memmap, float64
from pathlib import Path
from scipy.io import loadmat

from .utils import (read_hdf5_str,
                    read_hdf5_chan_name,
                    DEFAULT_DATETIME,
                    )
from ..utils import MissingDependency

try:
    from h5py import File
except ImportError as err:
    File = MissingDependency(err)


[docs]class EEGLAB: def __init__(self, filename): self.filename = Path(filename).resolve()
[docs] def return_hdr(self): """ subj_id : str subject identification code start_time : datetime start time of the dataset s_freq : float sampling frequency chan_name : list of str list of all the channels n_samples : int number of samples in the dataset orig : dict additional information taken directly from the header """ self.fdtfile = None try: self.EEG = loadmat(str(self.filename), struct_as_record=False, squeeze_me=True)['EEG'] self.hdf5 = False except NotImplementedError: self.hdf5 = True if not self.hdf5: self.s_freq = self.EEG.srate chan_name = [chan.labels for chan in self.EEG.chanlocs] n_samples = self.EEG.pnts if isinstance(self.EEG.subject, str): subj_id = self.EEG.subject else: subj_id = '' try: start_time = datetime(*self.EEG.etc.T0) except AttributeError: start_time = DEFAULT_DATETIME if isinstance(self.EEG.datfile, str): self.fdtfile = self.EEG.datfile else: self.data = self.EEG.data else: with File(self.filename) as f: EEG = f['EEG'] try: self.s_freq = EEG['srate'].value.item() except: self.s_freq = EEG['srate'][()].item() chan_name = read_hdf5_chan_name(f, EEG['chanlocs']['labels']) n_samples = int(EEG['pnts'].value.item()) subj_id = read_hdf5_str(EEG['subject']) try: if 'T0' in list(EEG['etc']): start_time = datetime(*EEG['etc']['T0']) elif 'rec_startdate' in list(EEG['etc']): EEG_starttime = EEG['etc']['rec_startdate'] start_time_char = ''.join([chr(x) for x in EEG_starttime]) start_time = datetime.fromisoformat(start_time_char) else: start_time = DEFAULT_DATETIME except ValueError: start_time = DEFAULT_DATETIME datfile = read_hdf5_str(EEG['datfile']) if datfile == '': self.data = EEG['data'].value.T # for some reason, you need to transpose this else: self.fdtfile = datfile if self.fdtfile is not None: memshape = (len(chan_name), int(n_samples)) memmap_file = self.filename.parent / self.fdtfile if not memmap_file.exists(): renamed_memmap_file = self.filename.with_suffix('.fdt') if not renamed_memmap_file.exists(): raise FileNotFoundError(f'No file {memmap_file} or {renamed_memmap_file}') else: memmap_file = renamed_memmap_file self.data = memmap(str(memmap_file), 'float32', mode='c', shape=memshape, order='F') return subj_id, start_time, self.s_freq, chan_name, n_samples, {}
[docs] def return_dat(self, chan, begsam, endsam): n_samples = self.data.shape[1] dat = self.data[:, max((begsam, 0)):min((endsam, n_samples))].astype(float64) if begsam < 0: pad = empty((dat.shape[0], 0 - begsam)) pad.fill(NaN) dat = c_[pad, dat] if endsam >= n_samples: pad = empty((dat.shape[0], endsam - n_samples)) pad.fill(NaN) dat = c_[dat, pad] return dat[chan, :]
[docs] def return_markers(self): markers = [] if self.hdf5: from h5py import File with File(self.filename) as f: for evt, latency in zip(f['EEG']['event']['type'], f['EEG']['event']['latency']): mrk_t = (f[latency[0]][0, 0] - 1) / self.s_freq markers.append({ 'name': str(f[evt[0]][0, 0]), 'start': mrk_t, 'end': mrk_t, }) else: for event in self.EEG.event: markers.append({ 'name': str(event.type), 'start': (event.latency - 1) / self.s_freq, 'end': (event.latency - 1) / self.s_freq, }) return markers