Source code for wonambi.ioeeg.micromed

from os import SEEK_SET, SEEK_END, SEEK_CUR
from datetime import datetime, date, timedelta
from logging import getLogger
from struct import unpack

from numpy import array, dtype, empty, fromfile, iinfo, memmap, NaN, pad, where
from numpy.lib.recfunctions import append_fields

N_ZONES = 15
MAX_SAMPLE = 128
MAX_CAN_VIEW = 128
ENCODING = 'iso-8859-1'

lg = getLogger(__name__)


[docs]class Micromed: """Basic class to read Micromed data. Supports .TRC and .VWR files Parameters ---------- filename : path to file the name of the filename or directory """ def __init__(self, filename): self.filename = filename self._header = {} with open(self.filename, 'rb') as f: self._header = _read_header(f) f.seek(0, SEEK_END) EOData = f.tell() self._bodata = self._header['BOData'] self._n_chan = self._header['n_chan'] self._n_bytes = self._header['n_bytes'] self._s_freq = self._header['s_freq'] self._n_smp = int((EOData - self._bodata) / (self._n_chan * self._n_bytes)) self._factors = array([ch['factor'] for ch in self._header['chans']]) self._offset = array([ch['logical_ground'] for ch in self._header['chans']]) self._triggers = self._header['trigger'] self._videos = self._header['dvideo']
[docs] def return_hdr(self): """Return the header for further use. Returns ------- subj_id : str subject identification code start_time : datetime start time of the dataset s_freq : float sampling frequency chan_name : list of str list of all the channels n_samples : int number of samples in the dataset orig : dict additional information taken directly from the header """ subj_id = self._header['name'] + ' ' + self._header['surname'] chan_name = [ch['chan_name'] for ch in self._header['chans']] offset = self._header['segments']['time_in_samples'][0] / self._header['s_freq'] start_time = self._header['start_time'] + timedelta(seconds=offset) return subj_id, start_time, self._header['s_freq'], chan_name, self._n_smp, self._header
[docs] def return_dat(self, chan, begsam, endsam): """Return the data as 2D numpy.ndarray. Parameters ---------- chan : int or list index (indices) of the channels to read begsam : int index of the first sample endsam : int index of the last sample Returns ------- numpy.ndarray A 2d matrix, with dimension chan X samples """ if type(chan) == int: # if single value is provided it needs to be transformed to list to generate a 2d matrix chan = [chan, ] if (begsam >= self._n_smp) or (endsam < 0): dat = empty((len(chan), endsam - begsam)) dat.fill(NaN) return dat if begsam < 0: begpad = -1 * begsam begsam = 0 else: begpad = 0 if endsam > self._n_smp: endpad = endsam - self._n_smp endsam = self._n_smp else: endpad = 0 dshape = (self._n_chan, endsam - begsam) sig_dtype = 'u' + str(self._n_bytes) offset = self._bodata + begsam * self._n_bytes * self._n_chan dat = memmap(str(self.filename), dtype=sig_dtype, order='F', mode='r', shape=dshape, offset=offset).astype('float') dat = pad(dat[chan, :], ((0, 0), (begpad, endpad)), mode='constant', constant_values=NaN) return (dat - self._offset[chan, None]) * self._factors[chan, None]
[docs] def return_markers(self): """Return all the markers (also called triggers or events). Returns ------- list of dict where each dict contains 'name' as str, 'start' and 'end' as float in seconds from the start of the recordings, and 'chan' as list of str with the channels involved (if not of relevance, it's None). Raises ------ FileNotFoundError when it cannot read the events for some reason (don't use other exceptions). """ markers = [] triggers = self._triggers DTYPE_MAX = iinfo(triggers.dtype['sample']).max triggers = triggers[triggers['sample'] != DTYPE_MAX] for trig in triggers: markers.append( {'name': str(trig['code']), 'start': trig['sample'] / self._s_freq, 'end': trig['sample'] / self._s_freq, }) for start, name in self._header['notes']: if start == 0: break if name == b'* * * Part 1 * * *': continue markers.append( {'name': name.decode(), 'start': start / self._s_freq, 'end': start / self._s_freq, }) return sorted(markers, key=lambda k: k['start'])
[docs] def return_videos(self, begtime, endtime): vid = self._videos begtime *= 1000 endtime *= 1000 # remove empty rows DTYPE_MAX = iinfo(vid.dtype['duration']).max vid = vid[vid['duration'] != DTYPE_MAX] if vid.shape[0] == 0: raise OSError('No videos for this dataset') vid = append_fields(vid, 'absolute_end', vid['delay'] + vid['duration'], usemask=False) # full name without number i_vid = where((endtime - vid['delay'] >= 0) & (vid['absolute_end'] - begtime >= 0))[0] mpgfiles = ['VID_' + str(vid['file_ext'][i]) + '.AVI' for i in i_vid] mpgfiles = [str(self.filename.parent / mpg) for mpg in mpgfiles] video_beg = (begtime - vid['delay'][i_vid[0]]) / 1000 video_end = (endtime - vid['delay'][i_vid[-1]]) / 1000 lg.debug('First Video (#{}) starts at {}'.format(mpgfiles[0], video_beg)) lg.debug('Last Video (#{}) ends at {}'.format(mpgfiles[-1], video_end)) return mpgfiles, video_beg, video_end
def _read_header(f): orig = {} orig['title'] = f.read(32).decode(ENCODING).strip() orig['laboratory'] = f.read(32).strip(b'\x00').decode(ENCODING).strip() # patient orig['surname'] = f.read(22).decode(ENCODING).strip() orig['name'] = f.read(20).decode(ENCODING).strip() month, day, year = unpack('bbb', f.read(3)) try: orig['date_of_birth'] = date(year + 1900, month, day) except ValueError: orig['date_of_birth'] = None f.seek(19, SEEK_CUR) # recording day, month, year, hour, minute, sec = unpack('bbbbbb', f.read(6)) orig['start_time'] = datetime(year + 1900, month, day, hour, minute, sec) acquisition_unit_code = unpack('h', f.read(2))[0] orig['acquisition_unit'] = ACQUISITION_UNIT.get(acquisition_unit_code, str(acquisition_unit_code)) filetype_code = unpack('H', f.read(2))[0] orig['filetype'] = FILETYPE.get(filetype_code, 'unknown headbox') orig['BOData'] = unpack('I', f.read(4))[0] orig['n_chan'] = unpack('H', f.read(2))[0] orig['multiplexer'] = unpack('H', f.read(2))[0] orig['s_freq'] = unpack('H', f.read(2))[0] orig['n_bytes'] = unpack('H', f.read(2))[0] orig['compression'] = unpack('H', f.read(2))[0] # 0 non compression, 1 compression. orig['n_montages'] = unpack('H', f.read(2))[0] # Montages : number of specific montages orig['dvideo_begin'] = unpack('I', f.read(4))[0] # Starting sample of digital video orig['mpeg_delay'] = unpack('H', f.read(2))[0] # Number of frames per hour of de-synchronization in MPEG acq f.seek(15, SEEK_CUR) header_type_code = unpack('b', f.read(1))[0] orig['header_type'] = HEADER_TYPE[header_type_code] zones = {} for _ in range(N_ZONES): zname, pos, length = unpack('8sII', f.read(16)) zname = zname.decode(ENCODING).strip() zones[zname] = pos, length pos, length = zones['ORDER'] f.seek(pos, SEEK_SET) order = fromfile(f, dtype='u2', count=orig['n_chan']) chans = _read_labcod(f, zones['LABCOD'], order) pos, length = zones['NOTE'] f.seek(pos, SEEK_SET) DTYPE = dtype([('sample', 'u4'), ('text', 'S40')]) notes = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) pos, length = zones['FLAGS'] f.seek(pos, SEEK_SET) DTYPE = dtype([('begin', 'u4'), ('end', 'u4')]) flags = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) pos, length = zones['TRONCA'] f.seek(pos, SEEK_SET) DTYPE = dtype([('time_in_samples', 'u4'), ('sample', 'u4')]) segments = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) # impedance DTYPE = dtype([('positive', 'u1'), ('negative', 'u1')]) pos, length = zones['IMPED_B'] f.seek(pos, SEEK_SET) impedance_begin = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) pos, length = zones['IMPED_E'] f.seek(pos, SEEK_SET) impedance_end = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) montage = _read_montage(f, zones['MONTAGE']) # if average has been computed pos, length = zones['COMPRESS'] f.seek(pos, SEEK_SET) avg = {} avg['trace'], avg['file'], avg['prestim'], avg['poststim'], avg['type'] = unpack('IIIII', f.read(5 * 4)) avg['free'] = f.read(108).strip(b'\x01\x00') history = _read_history(f, zones['HISTORY']) pos, length = zones['DVIDEO'] f.seek(pos, 0) DTYPE = dtype([('delay', 'i4'), ('duration', 'u4'), ('file_ext', 'u4'), ('empty', 'u4')]) dvideo = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) # events DTYPE = dtype([('code', 'u4'), ('begin', 'u4'), ('end', 'u4')]) pos, length = zones['EVENT A'] f.seek(pos, 0) event_a = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) pos, length = zones['EVENT B'] f.seek(pos, 0) event_b = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) pos, length = zones['TRIGGER'] DTYPE = dtype([('sample', 'u4'), ('code', 'u2')]) f.seek(pos, 0) trigger = fromfile(f, dtype=DTYPE, count=int(length / DTYPE.itemsize)) orig.update({ 'order': order, 'chans': chans, 'notes': notes, 'flags': flags, 'segments': segments, 'impedance_begin': impedance_begin, 'impedance_end': impedance_end, 'montage': montage, 'history': history, 'dvideo': dvideo, 'event_a': event_a, 'event_b': event_b, 'trigger': trigger, }) return orig ACQUISITION_UNIT = { 0: 'BQ124 - 24 channels headbox, Internal Interface', 2: 'MS40 - Holter recorder', 6: 'BQ132S - 32 channels headbox, Internal Interface', 7: 'BQ124 - 24 channels headbox, BQ CARD Interface', 8: 'SAM32 - 32 channels headbox, BQ CARD Interface', 9: 'SAM25 - 25 channels headbox, BQ CARD Interface', 10: 'BQ132S R - 32 channels reverse headbox, Internal Interface', 11: 'SAM32 R - 32 channels reverse headbox, BQ CARD Interface', 12: 'SAM25 R - 25 channels reverse headbox, BQ CARD Interface', 13: 'SAM32 - 32 channels headbox, Internal Interface', 14: 'SAM25 - 25 channels headbox, Internal Interface', 15: 'SAM32 R - 32 channels reverse headbox, Internal Interface', 16: 'SAM25 R - 25 channels reverse headbox, Internal Interface', 17: 'SD - 32 channels headbox with jackbox, SD CARD Interface -- PCI Internal Interface', 18: 'SD128 - 128 channels headbox, SD CARD Interface -- PCI Internal Interface', 19: 'SD96 - 96 channels headbox, SD CARD Interface -- PCI Internal Interface', 20: 'SD64 - 64 channels headbox, SD CARD Interface -- PCI Internal Interface', 21: 'SD128c - 128 channels headbox with jackbox, SD CARD Interface -- PCI Internal Interface', 22: 'SD64c - 64 channels headbox with jackbox, SD CARD Interface -- PCI Internal Interface', 23: 'BQ132S - 32 channels headbox, PCI Internal Interface', 24: 'BQ132S R - 32 channels reverse headbox, PCI Internal Interface', } FILETYPE = { 40: 'C128 C.R., 128 EEG (headbox SD128 only)', 42: 'C84P C.R., 84 EEG, 44 poly (headbox SD128 only)', 44: 'C84 C.R., 84 EEG, 4 reference signals (named MKR,MKRB,MKRC,MKRD) (headbox SD128 only)', 46: 'C96 C.R., 96 EEG (headbox SD128 -- SD96 -- BQ123S(r))', 48: 'C63P C.R., 63 EEG, 33 poly', 50: 'C63 C.R., 63 EEG, 3 reference signals (named MKR,MKRB,MKRC)', 52: 'C64 C.R., 64 EEG', 54: 'C42P C.R., 42 EEG, 22 poly', 56: 'C42 C.R., 42 EEG, 2 reference signals (named MKR,MKRB)', 58: 'C32 C.R., 32 EEG', 60: 'C21P C.R., 21 EEG, 11 poly', 62: 'C21 C.R., 21 EEG, 1 reference signal (named MKR)', 64: 'C19P C.R., 19 EEG, variable poly', 66: 'C19 C.R., 19 EEG, 1 reference signal (named MKR)', 68: 'C12 C.R., 12 EEG', 70: 'C8P C.R., 8 EEG, variable poly', 72: 'C8 C.R., 8 EEG', 74: 'CFRE C.R., variable EEG, variable poly', 76: 'C25P C.R., 25 EEG (21 standard, 4 poly transformed to EEG channels), 7 poly -- headbox BQ132S(r) only', 78: 'C27P C.R., 27 EEG (21 standard, 6 poly transformed to EEG channels), 5 poly -- headbox BQ132S(r) only', 80: 'C24P C.R., 24 EEG (21 standard, 3 poly transformed to EEG channels), 8 poly -- headbox SAM32(r) only', 82: 'C25P C.R., 25 EEG (21 standard, 4 poly transformed to EEG channels), 7 poly -- headbox SD with headbox JB 21P', 84: 'C27P C.R., 27 EEG (21 standard, 6 poly transformed to EEG channels), 5 poly -- headbox SD with headbox JB 21P', 86: 'C31P C.R., 27 EEG (21 standard, 10 poly transformed to EEG channels), 1 poly -- headbox SD with headbox JB 21P6', 100: 'C26P C.R., 26 EEG, 6 poly (headbox SD, SD64c, SD128c with headbox JB Mini)', 101: 'C16P C.R., 16 EEG, 16 poly (headbox SD with headbox JB M12)', 102: 'C12P C.R., 12 EEG, 20 poly (headbox SD with headbox JB M12)', 103: '32P 32 poly (headbox SD, SD64c, SD128c with headbox JB Bip)', 120: 'C48P C.R., 48 EEG, 16 poly (headbox SD64)', 121: 'C56P C.R., 56 EEG, 8 poly (headbox SD64)', 122: 'C24P C.R., 24 EEG, 8 poly (headbox SD64)', 140: 'C52P C.R., 52 EEG, 12 poly (headbox SD64c, SD128c with 2 headboxes JB Mini)', 141: '64P 64 poly (headbox SD64c, SD128c with 2 headboxes JB Bip)', 160: 'C88P C.R., 88 EEG, 8 poly (headbox SD96)', 161: 'C80P C.R., 80 EEG, 16 poly (headbox SD96)', 162: 'C72P C.R., 72 EEG, 24 poly (headbox SD96)', 180: 'C120P C.R., 120 EEG, 8 poly (headbox SD128)', 181: 'C112P C.R., 112 EEG, 16 poly (headbox SD128)', 182: 'C104P C.R., 104 EEG, 24 poly (headbox SD128)', 183: 'C96P C.R., 96 EEG, 32 poly (headbox SD128)', 200: 'C122P C.R., 122 EEG, 6 poly (headbox SD128c with 4 headboxes JB Mini)', 201: 'C116P C.R., 116 EEG, 12 poly (headbox SD128c with 4 headboxes JB Mini)', 202: 'C110P C.R., 110 EEG, 18 poly (headbox SD128c with 4 headboxes JB Mini)', 203: 'C104P C.R., 104 EEG, 24 poly (headbox SD128c with 4 headboxes JB Mini)', 204: '128P 128 poly (headbox SD128c with 4 headboxes JB Bip)', 205: '96P 96 poly (headbox SD128c with 3 headboxes JB Bip)', } HEADER_TYPE = { 0: 'Micromed "System 1" Header type', 1: 'Micromed "System 1" Header type', 2: 'Micromed "System 2" Header type', 3: 'Micromed "System98" Header type', 4: 'Micromed "System98" Header type', } UNITS = { -1: 'nV', 0: 'μV', 1: 'mV', 2: 'V', 100: '%', 101: 'bpm', 102: 'dimentionless', } def _read_labcod(f, zone, order): pos, length = zone CHAN_LENGTH = 128 chans = [] for i_ch in order: chan = {} f.seek(pos + i_ch * CHAN_LENGTH, 0) chan['status'] = f.read(1) # Status of electrode for acquisition : 0 : not acquired, 1 : acquired chan['channelType'] = f.read(1) # TODO: type of reference chan['chan_name'] = f.read(6).strip(b'\x01\x00').decode(ENCODING) chan['ground'] = f.read(6).strip(b'\x01\x00').decode(ENCODING) l_min, l_max, chan['logical_ground'], ph_min, ph_max = unpack('iiiii', f.read(20)) chan['factor'] = float(ph_max - ph_min) / float(l_max - l_min + 1) k = unpack('h', f.read(2))[0] chan['units'] = UNITS.get(k, UNITS[0]) chan['HiPass_Limit'], chan['HiPass_Type'] = unpack('HH', f.read(4)) chan['LowPass_Limit'], chan['LowPass_Type'] = unpack('HH', f.read(4)) chan['rate_coefficient'], chan['position'] = unpack('HH', f.read(4)) chan['Latitude'], chan['Longitude'] = unpack('ff', f.read(8)) chan['presentInMap'] = unpack('B', f.read(1))[0] chan['isInAvg'] = unpack('B', f.read(1))[0] chan['Description'] = f.read(32).strip(b'\x01\x00').decode(ENCODING) chan['xyz'] = unpack('fff', f.read(12)) chan['Coordinate_Type'] = unpack('H', f.read(2))[0] chan['free'] = f.read(24).strip(b'\x01\x00') chans.append(chan) return chans def _read_montage(f, zone): pos, length = zone f.seek(pos, SEEK_SET) montages = [] while f.tell() < (pos + length): montage = { 'lines': unpack('H', f.read(2))[0], 'sectors': unpack('H', f.read(2))[0], 'base_time': unpack('H', f.read(2))[0], 'notch': unpack('H', f.read(2))[0], 'colour': unpack(MAX_CAN_VIEW * 'B', f.read(MAX_CAN_VIEW)), 'selection': unpack(MAX_CAN_VIEW * 'B', f.read(MAX_CAN_VIEW)), 'description': f.read(64).strip(b'\x01\x00').decode(ENCODING), 'inputsNonInv': unpack(MAX_CAN_VIEW * 'H', f.read(MAX_CAN_VIEW * 2)), # NonInv : non inverting input 'inputsInv': unpack(MAX_CAN_VIEW * 'H', f.read(MAX_CAN_VIEW * 2)), # Inv : inverting input 'HiPass_Filter': unpack(MAX_CAN_VIEW * 'I', f.read(MAX_CAN_VIEW * 4)), 'LowPass_Filter': unpack(MAX_CAN_VIEW * 'I', f.read(MAX_CAN_VIEW * 4)), 'reference': unpack(MAX_CAN_VIEW * 'I', f.read(MAX_CAN_VIEW * 4)), 'free': f.read(1720).strip(b'\x01\x00'), } montages.append(montage) return montages def _read_history(f, zone): """This matches the Matlab reader from Matlab Exchange but doesn't seem correct. """ pos, length = zone f.seek(pos, SEEK_SET) histories = [] while f.tell() < (pos + length): history = { 'nSample': unpack(MAX_SAMPLE * 'I', f.read(MAX_SAMPLE * 4)), 'lines': unpack('H', f.read(2)), 'sectors': unpack('H', f.read(2)), 'base_time': unpack('H', f.read(2)), 'notch': unpack('H', f.read(2)), 'colour': unpack(MAX_CAN_VIEW * 'B', f.read(MAX_CAN_VIEW)), 'selection': unpack(MAX_CAN_VIEW * 'B', f.read(MAX_CAN_VIEW)), 'description': f.read(64).strip(b'\x01\x00'), 'inputsNonInv': unpack(MAX_CAN_VIEW * 'H', f.read(MAX_CAN_VIEW * 2)), # NonInv : non inverting input 'inputsInv': unpack(MAX_CAN_VIEW * 'H', f.read(MAX_CAN_VIEW * 2)), # Inv : inverting input 'HiPass_Filter': unpack(MAX_CAN_VIEW * 'I', f.read(MAX_CAN_VIEW * 4)), 'LowPass_Filter': unpack(MAX_CAN_VIEW * 'I', f.read(MAX_CAN_VIEW * 4)), 'reference': unpack(MAX_CAN_VIEW * 'I', f.read(MAX_CAN_VIEW * 4)), 'free': f.read(1720).strip(b'\x01\x00'), } histories.append(history) return histories