Source code for wonambi.ioeeg.blackrock

from datetime import datetime, timezone
from logging import getLogger, NOTSET, WARNING, disable
from os import SEEK_SET, SEEK_CUR, SEEK_END
from os.path import splitext
from struct import unpack
from pathlib import Path

from numpy import (asarray, empty, expand_dims, fromfile, iinfo, NaN, ones,
                   reshape, where)

lg = getLogger(__name__)

BLACKROCK_FORMAT = 'int16'  # by definition
blackrock_iinfo = iinfo(BLACKROCK_FORMAT)
N_BYTES = int(blackrock_iinfo.bits / 8)


[docs]class BlackRock: """Basic class to read the data. Parameters ---------- filename : path to file the name of the filename or directory """ def __init__(self, filename): self.filename = Path(filename) self.markers = [] self.BOData = None self.sess_begin = None self.sess_end = None self.factor = None
[docs] def return_hdr(self): """Return the header for further use. Returns ------- subj_id : str subject identification code start_time : datetime start time of the dataset (in UTC time, because that's the way it was saved and there is no way to know the correct local time reliably) s_freq : float sampling frequency chan_name : list of str list of all the channels n_samples : int number of samples in the dataset orig : dict additional information taken directly from the header Notes ----- The implementation needs to be updated for NEURALSG """ with open(self.filename, 'rb') as f: file_header = f.read(8) if file_header == b'NEURALEV': orig = _read_neuralev(self.filename) s_freq = orig['SampleRes'] n_samples = orig['DataDuration'] chan_name = [] # TODO: digital channels here instead of notes elif file_header == b'NEURALCD': orig = _read_neuralcd(self.filename) s_freq = orig['SamplingFreq'] n_samples = sum(orig['DataPoints']) + sum(orig['Timestamps']) chan_name = [x['Label'] for x in orig['ElectrodesInfo']] # INFO to read the data self.BOData = orig['BOData'] self.sess_begin, self.sess_end = _calc_sess_intervals(orig) self.factor = _convert_factor(orig['ElectrodesInfo']) nev_file = splitext(self.filename)[0] + '.nev' try: disable(WARNING) nev_orig = _read_neuralev(nev_file) disable(NOTSET) except FileNotFoundError: pass else: nev_orig.update(orig) # precedence to orig orig = nev_orig elif file_header == b'NEURALSG': orig = _read_neuralsg(self.filename) # raise NotImplementedError('This implementation needs to be updated') s_freq = orig['SamplingFreq'] n_samples = orig['DataPoints'] self.n_samples = n_samples self.factor = 0.25 * ones(len(orig['ChannelID'])) # make up names chan_name = ['chan{0:04d}'.format(x) for x in orig['ChannelID']] subj_id = str() start_time = orig['DateTime'] return subj_id, start_time, s_freq, chan_name, n_samples, orig
[docs] def return_dat(self, chan, begsam, endsam): """Return the data as 2D numpy.ndarray. Parameters ---------- chan : int or list index (indices) of the channels to read begsam : int index of the first sample endsam : int index of the last sample Returns ------- numpy.ndarray A 2d matrix, with dimension chan X samples """ ext = splitext(self.filename)[1] if ext == '.nev': raise TypeError('NEV contains only header info, not data') data = _read_nsx(self.filename, self.BOData, self.sess_begin, self.sess_end, self.factor, begsam, endsam) return data[chan, :]
[docs] def return_markers(self, trigger_bits=16, trigger_zero=True): """ Parameters ---------- trigger_bits : int, optional 8 or 16, read the triggers as one or two bytes trigger_zero : bool, optional read the trigger zero or not """ nev_file = splitext(self.filename)[0] + '.nev' markers = _read_neuralev( nev_file, read_markers=True, trigger_bits=16) if trigger_bits == 8: to8 = lambda x: str(int(x) - (256 ** 2 - 256)) for m in markers: m['name'] = to8(m['name']) if trigger_zero: no_zero = (i for i, m in enumerate(markers) if m['name'] != '0') markers_no_zero = [] for i in no_zero: if (i + 1) < len(markers) and markers[i + 1]['name'] == '0': markers[i]['end'] = markers[i + 1]['start'] markers_no_zero.append(markers[i]) markers = markers_no_zero return markers
def _read_nsx(filename, BOData, sess_begin, sess_end, factor, begsam, endsam): """ Notes ----- Tested on NEURALCD It returns NaN if you select an interval outside of the data """ n_chan = factor.shape[0] dat = empty((n_chan, endsam - begsam)) dat.fill(NaN) sess_to_read = where((begsam < sess_end) & (endsam > sess_begin))[0] with open(filename, 'rb') as f: for sess in sess_to_read: begsam_sess = begsam - sess_begin[sess] endsam_sess = endsam - sess_begin[sess] begshift = 0 if begsam_sess < 0: begsam_sess = 0 begshift = sess_begin[sess] - begsam if endsam_sess > (sess_end[sess] - sess_begin[sess]): endsam_sess = (sess_end[sess] - sess_begin[sess]) endshift = begshift + endsam_sess - begsam_sess f.seek(BOData[sess], SEEK_SET) f.seek(n_chan * N_BYTES * begsam_sess, SEEK_CUR) n_sam = endsam_sess - begsam_sess dat_in_file = fromfile(f, BLACKROCK_FORMAT, n_chan * n_sam) dat[:, begshift:endshift] = reshape(dat_in_file, (n_chan, n_sam), order='F') return expand_dims(factor, 1) * dat def _read_neuralsg(filename): """ """ hdr = {} with open(filename, 'rb') as f: hdr['FileTypeID'] = f.read(8).decode('utf-8') hdr['FileSpec'] = '2.1' hdr['SamplingLabel'] = _str(f.read(16).decode('utf-8')) hdr['TimeRes'] = 30000 hdr['SamplingFreq'] = int(hdr['TimeRes'] / unpack('<I', f.read(4))[0]) n_chan = unpack('<I', f.read(4))[0] hdr['ChannelCount'] = n_chan hdr['ChannelID'] = unpack('<' + 'I' * n_chan, f.read(4 * n_chan)) BOData = f.tell() f.seek(0, SEEK_END) EOData = f.tell() # we read the time information from the corresponding NEV file nev_filename = splitext(filename)[0] + '.nev' with open(nev_filename, 'rb') as f: f.seek(28) time = unpack('<' + 'H' * 8, f.read(16)) hdr['DateTime'] = datetime(time[0], time[1], time[3], time[4], time[5], time[6], time[7] * 1000) hdr['DataPoints'] = int((EOData - BOData) / (n_chan * N_BYTES)) hdr['BOData'] = BOData return hdr def _read_neuralcd(filename): """ Notes ----- The time stamps are stored in UTC in the NSx files. However, time stamps in the NEV files are stored as local time up to Central 6.03 included and stored as UTC after Central 6.05. It's impossible to know the version of Central from the header. """ hdr = {} with open(filename, 'rb') as f: hdr['FileTypeID'] = f.read(8).decode('utf-8') BasicHdr = f.read(306) filespec = unpack('bb', BasicHdr[0:2]) hdr['FileSpec'] = str(filespec[0]) + '.' + str(filespec[1]) hdr['HeaderBytes'] = unpack('<I', BasicHdr[2:6])[0] hdr['SamplingLabel'] = _str(BasicHdr[6:22].decode('utf-8')) hdr['Comment'] = _str(BasicHdr[22:278].decode('utf-8', 'ignore')) hdr['TimeRes'] = unpack('<I', BasicHdr[282:286])[0] hdr['SamplingFreq'] = int(hdr['TimeRes'] / unpack('<i', BasicHdr[278:282])[0]) time = unpack('<' + 'H' * 8, BasicHdr[286:302]) lg.info('The date/time from the header is stored in UTC, which is stored in DateTimeUTC') hdr['DateTime'] = datetime(time[0], time[1], time[3], time[4], time[5], time[6], time[7] * 1000, tzinfo=timezone.utc) hdr['ChannelCount'] = unpack('<I', BasicHdr[302:306])[0] ExtHdrLength = 66 readSize = hdr['ChannelCount'] * ExtHdrLength ExtHdr = f.read(readSize) ElectrodesInfo = [] for idx in range(hdr['ChannelCount']): i1 = idx * ExtHdrLength elec = {} i0, i1 = i1, i1 + 2 elec['Type'] = ExtHdr[i0:i1].decode('utf-8') assert elec['Type'] == 'CC' i0, i1 = i1, i1 + 2 elec['ElectrodeID'] = unpack('<H', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 16 elec['Label'] = _str(ExtHdr[i0:i1].decode('utf-8')) i0, i1 = i1, i1 + 1 elec['ConnectorBank'] = chr(unpack('<B', ExtHdr[i0:i1])[0] + ord('A') - 1) i0, i1 = i1, i1 + 1 elec['ConnectorPin'] = unpack('<B', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['MinDigiValue'] = unpack('<h', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['MaxDigiValue'] = unpack('<h', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['MinAnalogValue'] = unpack('<h', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['MaxAnalogValue'] = unpack('<h', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 16 elec['AnalogUnits'] = _str(ExtHdr[i0:i1].decode('utf-8')) i0, i1 = i1, i1 + 4 elec['HighFreqCorner'] = unpack('<I', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 elec['HighFreqOrder'] = unpack('<I', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['HighFilterType'] = unpack('<H', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 elec['LowFreqCorner'] = unpack('<I', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 elec['LowFreqOrder'] = unpack('<I', ExtHdr[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['LowFilterType'] = unpack('<H', ExtHdr[i0:i1])[0] ElectrodesInfo.append(elec) hdr['ElectrodesInfo'] = ElectrodesInfo hdr['ChannelID'] = [x['ElectrodeID'] for x in ElectrodesInfo] EOexH = f.tell() f.seek(0, SEEK_END) EOF = f.tell() f.seek(EOexH, SEEK_SET) n_chan = hdr['ChannelCount'] if f.tell() >= EOF: raise EOFError('File {0} does not seem to contain data ' '(size {1} B)'.format(filename, EOF)) BOData = [] EOData = [] Timestamps = [] DataPoints = [] while f.tell() < EOF and f.read(1) == b'\x01': Timestamps.append(unpack('<I', f.read(4))[0]) DataPoint = unpack('<I', f.read(4))[0] DataPoints.append(DataPoint) BOData.append(f.tell()) f.seek(N_BYTES * DataPoint * n_chan, SEEK_CUR) EOData.append(f.tell()) # the last datapoint does not get updated, so it remains 0 if DataPoints[-1] == 0: # we need to update EOData, because it depends on DataPoint EOData[-1] = EOF # we back compute the last DataPoint DataPoints[-1] = int((EOData[-1] - BOData[-1]) / N_BYTES / n_chan) hdr['BOData'] = BOData hdr['EOData'] = EOData hdr['Timestamps'] = Timestamps # sampled at 'TimeRes' Hz, ie 30000 hdr['DataPoints'] = DataPoints return hdr def _read_neuralev(filename, read_markers=False, trigger_bits=16): """Read some information from NEV Parameters ---------- filename : str path to NEV file read_markers : bool whether to read markers or not (it can get really large) trigger_bits : int, optional 8 or 16, read the triggers as one or two bytes Returns ------- MetaTags : list of dict which corresponds to MetaTags of openNEV Markers : list of dict markers in NEV file Notes ----- The conversion to DateTime in openNEV.m is not correct. They add a value of 2 to the day. Instead, they should add it to the index of the weekday It returns triggers as strings (format of EDFBrowser), but it does not read the othe types of events (waveforms, videos, etc). The time stamps are stored in UTC in the NSx files. However, time stamps in the NEV files are stored as local time up to Central 6.03 included and stored as UTC after Central 6.05. It's impossible to know the version of Central from the header. """ hdr = {} with open(filename, 'rb') as f: BasicHdr = f.read(336) i1 = 8 hdr['FileTypeID'] = BasicHdr[:i1].decode('utf-8') assert hdr['FileTypeID'] == 'NEURALEV' i0, i1 = i1, i1 + 2 filespec = unpack('bb', BasicHdr[i0:i1]) hdr['FileSpec'] = str(filespec[0]) + '.' + str(filespec[1]) i0, i1 = i1, i1 + 2 hdr['Flags'] = unpack('<H', BasicHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 hdr['HeaderOffset'] = unpack('<I', BasicHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 hdr['PacketBytes'] = unpack('<I', BasicHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 hdr['TimeRes'] = unpack('<I', BasicHdr[i0:i1])[0] i0, i1 = i1, i1 + 4 hdr['SampleRes'] = unpack('<I', BasicHdr[i0:i1])[0] i0, i1 = i1, i1 + 16 time = unpack('<' + 'H' * 8, BasicHdr[i0:i1]) lg.debug('DateTime is in local time with Central version <= 6.03' ' and in UTC with Central version > 6.05') hdr['DateTime'] = datetime(time[0], time[1], time[3], time[4], time[5], time[6], time[7] * 1000) i0, i1 = i1, i1 + 32 # hdr['Application'] = _str(BasicHdr[i0:i1].decode('utf-8')) i0, i1 = i1, i1 + 256 hdr['Comment'] = _str(BasicHdr[i0:i1].decode('utf-8', errors='replace')) i0, i1 = i1, i1 + 4 countExtHeader = unpack('<I', BasicHdr[i0:i1])[0] # you can read subject name from sif # Check data duration f.seek(-hdr['PacketBytes'], SEEK_END) hdr['DataDuration'] = unpack('<I', f.read(4))[0] hdr['DataDurationSec'] = hdr['DataDuration'] / hdr['SampleRes'] # Read the Extended Header f.seek(336) ElectrodesInfo = [] IOLabels = [] for i in range(countExtHeader): ExtendedHeader = f.read(32) i1 = 8 PacketID = ExtendedHeader[:i1].decode('utf-8') if PacketID == 'NEUEVWAV': elec = {} i0, i1 = i1, i1 + 2 elec['ElectrodeID'] = unpack('<H', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 1 elec['ConnectorBank'] = chr(ExtendedHeader[i0] + 64) i0, i1 = i1, i1 + 1 elec['ConnectorPin'] = ExtendedHeader[i0] i0, i1 = i1, i1 + 2 df = unpack('<h', ExtendedHeader[i0:i1])[0] # This is a workaround for the DigitalFactor overflow if df == 21516: elec['DigitalFactor'] = 152592.547 else: elec['DigitalFactor'] = df i0, i1 = i1, i1 + 2 elec['EnergyThreshold'] = unpack('<H', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['HighThreshold'] = unpack('<h', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['LowThreshold'] = unpack('<h', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 1 elec['Units'] = ExtendedHeader[i0] i0, i1 = i1, i1 + 1 elec['WaveformBytes'] = ExtendedHeader[i0] ElectrodesInfo.append(elec) elif PacketID == 'NEUEVLBL': i0, i1 = i1, i1 + 2 ElectrodeID = unpack('<H', ExtendedHeader[i0:i1])[0] - 1 s = _str(ExtendedHeader[i1:].decode('utf-8')) ElectrodesInfo[ElectrodeID]['ElectrodeLabel'] = s elif PacketID == 'NEUEVFLT': elec = {} i0, i1 = i1, i1 + 2 ElectrodeID = unpack('<H', ExtendedHeader[i0:i1])[0] - 1 i0, i1 = i1, i1 + 4 elec['HighFreqCorner'] = unpack('<I', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 4 elec['HighFreqOrder'] = unpack('<I', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['HighFilterType'] = unpack('<H', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 4 elec['LowFreqCorner'] = unpack('<I', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 4 elec['LowFreqOrder'] = unpack('<I', ExtendedHeader[i0:i1])[0] i0, i1 = i1, i1 + 2 elec['LowFilterType'] = unpack('<H', ExtendedHeader[i0:i1])[0] ElectrodesInfo[ElectrodeID].update(elec) elif PacketID == 'DIGLABEL': # TODO: the order is not taken into account and probably wrong! iolabel = {} iolabel['mode'] = ExtendedHeader[24] + 1 s = _str(ExtendedHeader[8:25].decode('utf-8')) iolabel['label'] = s IOLabels.append(iolabel) else: raise NotImplementedError(PacketID + ' not implemented yet') hdr['ChannelID'] = [x['ElectrodeID'] for x in ElectrodesInfo] fExtendedHeader = f.tell() fData = f.seek(0, SEEK_END) countDataPacket = int((fData - fExtendedHeader) / hdr['PacketBytes']) markers = [] if read_markers and countDataPacket: f.seek(fExtendedHeader) x = f.read(countDataPacket * hdr['PacketBytes']) DigiValues = [] for j in range(countDataPacket): i = j * hdr['PacketBytes'] if trigger_bits == 16: tempDigiVals = unpack('<H', x[8 + i:10 + i])[0] else: tempDigiVals = unpack('<H', x[8 + i:9 + i] + b'\x00')[0] val = {'timestamp': unpack('<I', x[0 + i:4 + i])[0], 'packetID': unpack('<H', x[4 + i:6 + i])[0], 'tempClassOrReason': unpack('<B', x[6 + i:7 + i])[0], 'tempDigiVals': tempDigiVals} DigiValues.append(val) digserPacketID = 0 not_serialdigital = [x for x in DigiValues if not x['packetID'] == digserPacketID] if not_serialdigital: lg.debug('Code not implemented to read PacketID ' + str(not_serialdigital[0]['packetID'])) # convert to markers for val in DigiValues: m = {'name': str(val['tempDigiVals']), 'start': val['timestamp'] / hdr['SampleRes'], 'end': val['timestamp'] / hdr['SampleRes'], 'chan': [''], } markers.append(m) if read_markers: return markers else: return hdr def _str(t_in): t_out = [] for t in t_in: if t == '\x00': break t_out.append(t) return ''.join(t_out) def _convert_factor(ElectrodesInfo): factor = [] for elec in ElectrodesInfo: # have to be equal, so it's simple to calculate conversion factor assert elec['MaxDigiValue'] == -elec['MinDigiValue'] assert elec['MaxAnalogValue'] == -elec['MinAnalogValue'] factor.append((elec['MaxAnalogValue'] - elec['MinAnalogValue']) / (elec['MaxDigiValue'] - elec['MinDigiValue'])) return asarray(factor) def _calc_sess_intervals(hdr): sess_begin = [] sess_end = [] n_sess = len(hdr['Timestamps']) # timestamps are always at 30 kHz, so we need to convert sampling_inverval = hdr['TimeRes'] / hdr['SamplingFreq'] for i in range(n_sess): sess_smp_begin = (sum(hdr['DataPoints'][:i]) + sum(hdr['Timestamps'][:i + 1]) / sampling_inverval) sess_begin.append(sess_smp_begin) sess_smp_end = (sum(hdr['DataPoints'][:i + 1]) + sum(hdr['Timestamps'][:i + 1]) / sampling_inverval) sess_end.append(sess_smp_end) sess_begin = asarray(sess_begin, dtype=int) sess_end = asarray(sess_end, dtype=int) return sess_begin, sess_end