"""Module reads and writes header and data for KTLX data. The files are:
- .eeg (patient information)
- .ent (notes, sometimes with a backup file called .ent.old)
- .erd (raw data)
- .etc (table of content)
- .snc (synchronization file)
- .stc (segmented table of content)
- .vtc (video table of content)
- .avi (videos)
There is only one of these files in the directory, except for .erd, .etc., .avi
These files are numbered in the format _%03d, except for the first one, which
is not _000 but there is no extension, for backwards compatibility.
This module contains functions to read each of the files, the files are called
_read_EXT where EXT is one of the extensions.
"""
from binascii import hexlify
from datetime import timedelta, datetime
from logging import getLogger
from math import ceil
from os.path import join
from pathlib import Path
from re import sub
from struct import unpack
from numpy import (array,
asarray,
concatenate,
dtype,
empty,
expand_dims,
fromfile,
int32,
NaN,
ones,
unpackbits,
where,
)
lg = getLogger(__name__)
BITS_IN_BYTE = 8
# http://support.microsoft.com/kb/167296
# How To Convert a UNIX time_t to a Win32 FILETIME or SYSTEMTIME
EPOCH_AS_FILETIME = 116444736000000000 # January 1, 1970 as MS file time
HUNDREDS_OF_NANOSECONDS = 10000000
START_TIME_TOL = 10
[docs]def get_date_idx(time_of_interest, start_time, end_time):
idx = None
for i in range(len(start_time)):
if (time_of_interest >= start_time[i] and
time_of_interest <= end_time[i]):
idx = i
break
return idx
[docs]def convert_sample_to_video_time(sample, orig_s_freq, sampleStamp,
sampleTime):
"""Convert sample number to video time, using snc information.
Parameters
----------
sample : int
sample that you want to convert in time
orig_s_freq : int
sampling frequency (used as backup)
sampleStamp : list of int
Sample number from start of study
sampleTime : list of datetime.datetime
File time representation of sampleStamp
Returns
-------
instance of datetime
absolute time of the sample.
Notes
-----
Note that there is a discrepancy of 4 or 5 hours between the time in
snc and the time in the header. I'm pretty sure that the time in the
header is accurate, so we use that. I think that the time in snc does
not take into account the time zone (that'd explain the 4 or 5
depending on summertime). This time is only used to get the right video
so we call this "video time".
"""
if sample < sampleStamp[0]:
s_freq = orig_s_freq
id0 = 0
elif sample > sampleStamp[-1]:
s_freq = orig_s_freq
id0 = len(sampleStamp) - 1
else:
id0 = where(asarray(sampleStamp) <= sample)[0][-1]
id1 = where(asarray(sampleStamp) >= sample)[0][0]
if id0 == id1:
return sampleTime[id0]
s_freq = ((sampleStamp[id1] - sampleStamp[id0]) /
(sampleTime[id1] - sampleTime[id0]).total_seconds())
time_diff = timedelta(seconds=(sample - sampleStamp[id0]) / s_freq)
return sampleTime[id0] + time_diff
def _calculate_conversion(hdr):
"""Calculate the conversion factor.
Returns
-------
conv_factor : numpy.ndarray
channel-long vector with the channel-specific conversion factor
Notes
-----
Final units are microvolts
It should include all the headbox versions apart from 5 because it depends
on subversion.
"""
discardbits = hdr['discardbits']
n_chan = hdr['num_channels']
if hdr['headbox_type'][0] in (1, 3):
# all channels
factor = ones((n_chan)) * (8711. / (2 ** 21 - 0.5)) * 2 ** discardbits
elif hdr['headbox_type'][0] == 4:
# 0 - 23
ch1 = ones((24)) * (8711. / (2 ** 21 - 0.5)) * 2 ** discardbits
# 24 - 27
ch2 = ones((4)) * ((5000000. / (2 ** 10 - 0.5)) / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2))
elif hdr['headbox_type'][0] == 6:
# 0 - 31
ch1 = ones((32)) * (8711. / (2 ** 21 - 0.5)) * 2 ** discardbits
# 32 - 35
ch2 = ones((4)) * ((5000000. / (2 ** 10 - 0.5)) / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2))
elif hdr['headbox_type'][0] == 8:
# 0 - 24
ch1 = ones((25)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 25 - 26
ch2 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2))
elif hdr['headbox_type'][0] == 9:
# 0 - 32
ch1 = ones((33)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 33 - 34
ch2 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2))
elif hdr['headbox_type'][0] == 14:
# 0 - 37
ch1 = ones((38)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 38 - 47
ch2 = ones((10)) * ((10800000 / 65536) / (2 ** 6)) * 2 ** discardbits
# 48-49
ch3 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2, ch3))
elif hdr['headbox_type'][0] == 15:
# 0 - 23
ch1 = ones((24)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 24 - 27 (as above)
ch2 = ones((4)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 28 - 31 (note 10000000 instead of 10800000)
ch3 = ones((4)) * ((10000000 / 65536) / (2 ** 6)) * 2 ** discardbits
# 32-33
ch4 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2, ch3, ch4))
elif hdr['headbox_type'][0] == 17:
# 0 - 39
ch1 = ones((40)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 40 - 43
ch2 = ones((4)) * ((10800000 / 65536) / (2 ** 6)) * 2 ** discardbits
# 44 - 45
ch3 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2, ch3))
elif hdr['headbox_type'][0] == 19:
# all channels
factor = ones((n_chan)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
elif hdr['headbox_type'][0] == 21:
# 0 - 127
ch1 = ones((128)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 128 - 129
ch2 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
# 130 - 255
ch3 = ones((126)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
factor = concatenate((ch1, ch2, ch3))
elif hdr['headbox_type'][0] == 22:
# 0 - 31
ch1 = ones((32)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 32 - 39
ch2 = ones((8)) * ((10800000. / 65536.) / (2 ** 6)) * 2 ** discardbits
# 40 - 41
ch3 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
# 42
ch4 = ones((1)) * ((10800000. / 65536.) / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2, ch3, ch4))
elif hdr['headbox_type'][0] == 23:
# 0 - 31
ch1 = ones((32)) * (8711. / ((2 ** 21) - 0.5)) * 2 ** discardbits
# 32 - 35
ch2 = ones((4)) * ((10800000. / 65536.) / (2 ** 6)) * 2 ** discardbits
# 36 - 37
ch3 = ones((2)) * (1 / (2 ** 6)) * 2 ** discardbits
# 38
ch4 = ones((1)) * ((10800000. / 65536.) / (2 ** 6)) * 2 ** discardbits
factor = concatenate((ch1, ch2, ch3, ch4))
else:
raise NotImplementedError('Implement conversion factor for headbox ' +
str(hdr['headbox_type'][0]))
return factor[:n_chan]
def _filetime_to_dt(ft):
"""Converts a Microsoft filetime number to a Python datetime. The new
datetime object is time zone-naive but is equivalent to tzinfo=utc.
"""
# Get seconds and remainder in terms of Unix epoch
s, ns100 = divmod(ft - EPOCH_AS_FILETIME, HUNDREDS_OF_NANOSECONDS)
# Convert to datetime object
dt = datetime.utcfromtimestamp(s)
# Add remainder in as microseconds. Python 3.2 requires an integer
dt = dt.replace(microsecond=(ns100 // 10))
return dt
def _find_channels(note):
"""Find the channel names within a string.
The channel names are stored in the .ent file. We can read the file with
_read_ent and we can parse most of the notes (comments) with _read_notes
however the note containing the montage cannot be read because it's too
complex. So, instead of parsing it, we just pass the string of the note
around. This function takes the string and finds where the channel
definition is.
Parameters
----------
note : str
string read from .ent file, it's the note which contains montage.
Returns
-------
chan_name : list of str
the names of the channels.
"""
id_ch = note.index('ChanNames')
chan_beg = note.index('(', id_ch)
chan_end = note.index(')', chan_beg)
note_with_chan = note[chan_beg + 1:chan_end]
return [x.strip('" ') for x in note_with_chan.split(',')]
def _find_start_time(hdr, s_freq):
"""Find the start time, usually in STC, but if that's not correct, use ERD
Parameters
----------
hdr : dict
header with stc (and stamps) and erd
s_freq : int
sampling frequency
Returns
-------
datetime
either from stc or from erd
Notes
-----
Sometimes, but rather rarely, there is a mismatch between the time in the
stc and the time in the erd. For some reason, the time in the stc is way
off (by hours), which is clearly not correct.
We can try to reconstruct the actual time, but looking at the ERD time
(of any file apart from the first one) and compute the original time back
based on the offset of the number of samples in stc. For some reason, this
is not the same for all the ERD, but the jitter is in the order of 1-2s
which is acceptable for our purposes (probably, but be careful about the
notes).
"""
start_time = hdr['stc']['creation_time']
for one_stamp in hdr['stamps']:
if one_stamp['segment_name'].decode() == hdr['erd']['filename']:
offset = one_stamp['start_stamp']
break
erd_time = (hdr['erd']['creation_time'] -
timedelta(seconds=offset / s_freq)).replace(microsecond=0)
stc_erd_diff = (start_time - erd_time).total_seconds()
if stc_erd_diff > START_TIME_TOL:
lg.warn('Time difference between ERD and STC is {} s so using ERD time'
' at {}'.format(stc_erd_diff, erd_time))
start_time = erd_time
return start_time
def _make_str(t_in):
t_out = []
for t in t_in:
if t == b'\x00':
break
t_out.append(t.decode('utf-8'))
return ''.join(t_out)
def _read_eeg(eeg_file):
"""Reads eeg file, but it doesn't work, the data is in text format, but
based on Excel. You can read it from the editor, there are still a couple
of signs that are not in Unicode.
TODO: parse the text of EEG, if it's interesting
Notes
-----
The patient information file consists of a single null terminated string
following the generic file header. The string encodes all the patient
information in a list format defined by a hierarchy of name value pairs.
"""
pass
def _read_ent(ent_file):
"""Read notes stored in .ent file.
This is a basic implementation, that relies on turning the information in
the string in the dict format, and then evaluate it. It's not very flexible
and it might not read some notes, but it's fast. I could not implement a
nice, recursive approach.
Returns
-------
allnote : a list of dict
where each dict contains keys such as:
- type
- length : length of the note in B,
- prev_length : length of the previous note in B,
- unused,
- value : the actual content of the note.
Notes
-----
The notes are stored in a format called 'Excel list' but could not find
more information. It's based on "(" and "(.", and I found it very hard to
parse. With some basic regex and substitution, it can be evaluated into
a dict, with sub dictionaries. However, the note containing the name of the
electrodes (I think they called it "montage") cannot be parsed, because
it's too complicated. If it cannot be converted into a dict, the whole
string is passed as value.
"""
with ent_file.open('rb') as f:
f.seek(352) # end of header
note_hdr_length = 16
allnote = []
while True:
note = {}
note['type'], = unpack('<i', f.read(4))
note['length'], = unpack('<i', f.read(4))
note['prev_length'], = unpack('<i', f.read(4))
note['unused'], = unpack('<i', f.read(4))
if not note['type']:
break
s = f.read(note['length'] - note_hdr_length)
s = s[:-2] # it ends with one empty byte
s = s.decode('utf-8', errors='replace')
s1 = s.replace('\n', ' ')
s1 = s1.replace('\\xd ', '')
s1 = s1.replace('(.', '{')
s1 = sub(r'\(([A-Za-z0-9," ]*)\)', r'[\1]', s1)
s1 = s1.replace(')', '}')
# s1 = s1.replace('",', '" :')
s1 = sub(r'(\{[\w"]*),', r'\1 :', s1)
s1 = s1.replace('{"', '"')
s1 = s1.replace('},', ',')
s1 = s1.replace('}}', '}')
s1 = sub(r'\(([0-9 ,-\.]*)\}', r'[\1]', s1)
try:
note['value'] = eval(s1)
except:
note['value'] = s
allnote.append(note)
return allnote
def _read_packet(f, pos, n_smp, n_allchan, abs_delta):
"""
Read a packet of compressed data
Parameters
----------
f : instance of opened file
erd file
pos : int
index of the start of the packet in the file (in bytes from beginning
of the file)
n_smp : int
number of samples to read
n_allchan : int
number of channels (we should specify if shorted or not)
abs_delta: byte
if the delta has this value, it means that you should read the absolute
value at the end of packet. If schema is 7, the length is 1; if schema
is 8 or 9, the length is 2.
Returns
-------
ndarray
data read in the packet up to n_smp.
Notes
-----
TODO: shorted chan. If I remember correctly, deltamask includes all the
channels, but the absolute values are only used for not-shorted channels
TODO: implement schema 7, which is slightly different, but I don't remember
where exactly.
"""
if len(abs_delta) == 1: # schema 7
abs_delta = unpack('b', abs_delta)[0]
else: # schema 8, 9
abs_delta = unpack('h', abs_delta)[0]
l_deltamask = int(ceil(n_allchan / BITS_IN_BYTE))
dat = empty((n_allchan, n_smp), dtype=int32)
f.seek(pos)
for i_smp in range(n_smp):
eventbite = f.read(1)
try:
assert eventbite in (b'\x00', b'\x01')
except:
raise Exception('at pos ' + str(i_smp) +
', eventbite (should be x00 or x01): ' +
str(eventbite))
byte_deltamask = unpack('<' + 'B' * l_deltamask, f.read(l_deltamask))
deltamask = unpackbits(array(byte_deltamask[::-1], dtype ='uint8'))
deltamask = deltamask[:-n_allchan-1:-1]
n_bytes = int(deltamask.sum()) + deltamask.shape[0]
deltamask = deltamask.astype('bool')
# numpy has a weird way of handling string/bytes.
# We create a byte representation, because then tostring() works fine
delta_dtype = empty(n_allchan, dtype='a1')
delta_dtype[deltamask] = 'h'
delta_dtype[~deltamask] = 'b'
relval = array(unpack('<' + delta_dtype.tostring().decode(),
f.read(n_bytes)))
read_abs = (delta_dtype == b'h') & (relval == abs_delta)
dat[~read_abs, i_smp] = dat[~read_abs, i_smp - 1] + relval[~read_abs]
dat[read_abs, i_smp] = fromfile(f, 'i', count=read_abs.sum())
return dat
def _read_erd(erd_file, begsam, endsam):
"""Read the raw data and return a matrix, converted to microvolts.
Parameters
----------
erd_file : str
one of the .erd files to read
begsam : int
index of the first sample to read
endsam : int
index of the last sample (excluded, per python convention)
Returns
-------
numpy.ndarray
2d matrix with the data, as read from the file
Error
-----
It checks whether the event byte (the first byte) is x00 as expected.
It can also be x01, meaning that an event was generated by an external
trigger. According to the manual, "a photic stimulator is the only
supported device which generates an external trigger."
If the eventbyte is something else, it throws an error.
Notes
-----
Each sample point consists of these parts:
- Event Byte
- Frequency byte (only if file_schema >= 8 and one chan has != freq)
- Delta mask (only if file_schema >= 8)
- Delta Information
- Absolute Channel Values
Event Byte:
Bit 0 of the event byte indicates the presence of the external trigger
during the sample period. It's very rare.
Delta Mask:
Bit-mask of a size int( number_of_channels / 8 + 0.5). Each 1 in the mask
indicates that corresponding channel has 2*n bit delta, 0 means that
corresponding channel has n bit delta.
The rest of the byte of the delta mask is filled with "1".
If file_schema <= 7, it generates a "fake" delta, where everything is 0.
Some channels are shorted (i.e. not recorded), however they are stored in
a non-intuitive way: deltamask takes them into account, but for the rest
they are never used/recorded. So, we need to keep track both of all the
channels (including the non-shorted) and of the actual channels only.
When we save the data as memory-mapped, we only save the real channels.
However, the data in the output have both shorted and non-shorted channels.
Shorted channels have NaN's only.
About the actual implementation, we always follow the python convention
that the first sample is included and the last sample is not.
"""
hdr = _read_hdr_file(erd_file)
n_allchan = hdr['num_channels']
shorted = hdr['shorted'] # does this exist for Schema 7 at all?
n_shorted = sum(shorted)
if n_shorted > 0:
raise NotImplementedError('shorted channels not tested yet')
if hdr['file_schema'] in (7,):
abs_delta = b'\x80' # one byte: 10000000
raise NotImplementedError('schema 7 not tested yet')
if hdr['file_schema'] in (8, 9):
abs_delta = b'\xff\xff'
n_smp = endsam - begsam
data = empty((n_allchan, n_smp))
data.fill(NaN)
# it includes the sample in both cases
etc = _read_etc(erd_file.with_suffix('.etc'))
all_beg = etc['samplestamp']
all_end = etc['samplestamp'] + etc['sample_span'] - 1
try:
begrec = where((all_end >= begsam))[0][0]
endrec = where((all_beg < endsam))[0][-1]
except IndexError:
return data
with erd_file.open('rb') as f:
for rec in range(begrec, endrec + 1):
# [begpos_rec, endpos_rec]
begpos_rec = begsam - all_beg[rec]
endpos_rec = endsam - all_beg[rec]
begpos_rec = max(begpos_rec, 0)
endpos_rec = min(endpos_rec, all_end[rec] - all_beg[rec] + 1)
# [d1, d2)
d1 = begpos_rec + all_beg[rec] - begsam
d2 = endpos_rec + all_beg[rec] - begsam
dat = _read_packet(f, etc['offset'][rec], endpos_rec, n_allchan,
abs_delta)
data[:, d1:d2] = dat[:, begpos_rec:endpos_rec]
# fill up the output data, put NaN for shorted channels
if n_shorted > 0:
full_channels = where(asarray([x == 0 for x in shorted]))[0]
output = empty((n_allchan, n_smp))
output.fill(NaN)
output[full_channels, :] = data
else:
output = data
factor = _calculate_conversion(hdr)
return expand_dims(factor, 1) * output
def _read_etc(etc_file):
"""Return information about table of content for each erd.
"""
etc_type = dtype([('offset', '<i'),
('samplestamp', '<i'),
('sample_num', '<i'),
('sample_span', '<h'),
('unknown', '<h')])
with etc_file.open('rb') as f:
f.seek(352) # end of header
etc = fromfile(f, dtype=etc_type)
return etc
def _read_snc(snc_file):
"""Read Synchronization File and return sample stamp and time
Returns
-------
sampleStamp : list of int
Sample number from start of study
sampleTime : list of datetime.datetime
File time representation of sampleStamp
Notes
-----
The synchronization file is used to calculate a FILETIME given a sample
stamp (and vise-versa). Theoretically, it is possible to calculate a sample
stamp's FILETIME given the FILETIME of sample stamp zero (when sampling
started) and the sample rate. However, because the sample rate cannot be
represented with full precision the accuracy of the FILETIME calculation is
affected.
To compensate for the lack of accuracy, the synchronization file maintains
a sample stamp-to-computer time (called, MasterTime) mapping. Interpolation
is then used to calculate a FILETIME given a sample stamp (and vise-versa).
The attributes, sampleStamp and sampleTime, are used to predict (using
interpolation) the FILETIME based upon a given sample stamp (and
vise-versa). Currently, the only use for this conversion process is to
enable correlation of EEG (sample_stamp) data with other sources of data
such as Video (which works in FILETIME).
"""
snc_raw_dtype = dtype([('sampleStamp', '<i'),
('sampleTime', '<q')])
with snc_file.open('rb') as f:
f.seek(352) # end of header
snc_raw = fromfile(f, dtype=snc_raw_dtype)
sampleStamp = snc_raw['sampleStamp']
sampleTime = asarray([_filetime_to_dt(x) for x in snc_raw['sampleTime']])
return sampleStamp, sampleTime
def _read_stc(stc_file):
"""Read Segment Table of Contents file.
Returns
-------
hdr : dict
- next_segment : Sample frequency in Hertz
- final : Number of channels stored
- padding : Padding
stamps : ndarray of dtype
- segment_name : Name of ERD / ETC file segment
- start_stamp : First sample stamp that is found in the ERD / ETC pair
- end_stamp : Last sample stamp that is still found in the ERD / ETC
pair
- sample_num : Number of samples actually being recorded (gaps in the
data are not included in this number)
- sample_span : Number of samples in that .erd file
Notes
-----
The Segment Table of Contents file is an index into pairs of (raw data file
/ table of contents file). It is used for mapping samples file segments.
EEG raw data is split into segments in order to break a single file size
limit (used to be 2GB) while still allowing quick searches. This file ends
in the extension '.stc'. Default segment size (size of ERD file after which
it is closed and new [ERD / ETC] pair is opened) is 50MB. The file starts
with a generic EEG file header, and is followed by a series of fixed length
records called the STC entries. ERD segments are named according to the
following schema:
- <FIRST_NAME>, <LAST_NAME>_<GUID>.ERD (first)
- <FIRST_NAME>, <LAST_NAME>_<GUID>.ETC (first)
- <FIRST_NAME>, <LAST_NAME>_<GUID>_<INDEX>.ERD (second and subsequent)
- <FIRST_NAME>, <LAST_NAME>_<GUID>_<INDEX>.ETC (second and subsequent)
<INDEX> is formatted with "%03d" format specifier and starts at 1 (initial
value being 0 and omitted for compatibility with the previous versions).
"""
hdr = _read_hdr_file(stc_file) # read header the normal way
stc_dtype = dtype([('segment_name', 'a256'),
('start_stamp', '<i'),
('end_stamp', '<i'),
('sample_num', '<i'),
('sample_span', '<i')])
with stc_file.open('rb') as f:
f.seek(352) # end of header
hdr['next_segment'] = unpack('<i', f.read(4))[0]
hdr['final'] = unpack('<i', f.read(4))[0]
hdr['padding'] = unpack('<' + 'i' * 12, f.read(48))
stamps = fromfile(f, dtype=stc_dtype)
return hdr, stamps
def _read_vtc(vtc_file):
"""Read the VTC file.
Parameters
----------
vtc_file : str
path to vtc file
Returns
-------
mpg_file : list of str
list of avi files
start_time : list of datetime
list of start time of the avi files
end_time : list of datetime
list of end time of the avi files
"""
with vtc_file.open('rb') as f:
filebytes = f.read()
hdr = {}
hdr['file_guid'] = hexlify(filebytes[:16])
# not sure about the 4 Bytes inbetween
i = 20
mpg_file = []
start_time = []
end_time = []
while i < len(filebytes):
mpg_file.append(_make_str(unpack('c' * 261, filebytes[i:i + 261])))
i += 261
Location = filebytes[i:i + 16]
correct = b'\xff\xfe\xf8^\xfc\xdc\xe5D\x8f\xae\x19\xf5\xd6"\xb6\xd4'
assert Location == correct
i += 16
start_time.append(_filetime_to_dt(unpack('<q',
filebytes[i:(i + 8)])[0]))
i += 8
end_time.append(_filetime_to_dt(unpack('<q',
filebytes[i:(i + 8)])[0]))
i += 8
return mpg_file, start_time, end_time
def _read_hdr_file(ktlx_file):
"""Reads header of one KTLX file.
Parameters
----------
ktlx_file : Path
name of one of the ktlx files inside the directory (absolute path)
Returns
-------
dict
dict with information about the file
Notes
-----
p.3: says long, but python-long requires 8 bytes, so we use f.read(4)
GUID is correct, BUT little/big endian problems somewhere
"""
with ktlx_file.open('rb') as f:
hdr = {}
assert f.tell() == 0
hdr['file_guid'] = hexlify(f.read(16))
hdr['file_schema'], = unpack('<H', f.read(2))
if not hdr['file_schema'] in (1, 3, 7, 8, 9):
raise NotImplementedError('Reading header not implemented for ' +
'file_schema ' + str(hdr['file_schema']))
hdr['base_schema'], = unpack('<H', f.read(2))
if not hdr['base_schema'] == 1: # p.3: base_schema 0 is rare, I think
raise NotImplementedError('Reading header not implemented for ' +
'base_schema ' + str(hdr['base_schema']))
hdr['creation_time'] = datetime.fromtimestamp(unpack('<i',
f.read(4))[0])
hdr['patient_id'], = unpack('<i', f.read(4))
hdr['study_id'], = unpack('<i', f.read(4))
hdr['pat_last_name'] = _make_str(unpack('c' * 80, f.read(80)))
hdr['pat_first_name'] = _make_str(unpack('c' * 80, f.read(80)))
hdr['pat_middle_name'] = _make_str(unpack('c' * 80, f.read(80)))
hdr['patient_id'] = _make_str(unpack('c' * 80, f.read(80)))
assert f.tell() == 352
if hdr['file_schema'] >= 7:
hdr['sample_freq'], = unpack('<d', f.read(8))
n_chan, = unpack('<i', f.read(4))
hdr['num_channels'] = n_chan
hdr['deltabits'], = unpack('<i', f.read(4))
hdr['phys_chan'] = unpack('<' + 'i' * hdr['num_channels'],
f.read(hdr['num_channels'] * 4))
f.seek(4464)
hdr['headbox_type'] = unpack('<' + 'i' * 4, f.read(16))
hdr['headbox_sn'] = unpack('<' + 'i' * 4, f.read(16))
hdr['headbox_sw_version'] = _make_str(unpack('c' * 40, f.read(40)))
hdr['dsp_hw_version'] = _make_str(unpack('c' * 10, f.read(10)))
hdr['dsp_sw_version'] = _make_str(unpack('c' * 10, f.read(10)))
hdr['discardbits'], = unpack('<i', f.read(4))
if hdr['file_schema'] >= 8:
hdr['shorted'] = unpack('<' + 'h' * 1024, f.read(2048))[:n_chan]
hdr['frequency_factor'] = unpack('<' + 'h' * 1024,
f.read(2048))[:n_chan]
return hdr
[docs]class Ktlx():
def __init__(self, ktlx_dir):
lg.info('Reading ' + str(ktlx_dir))
self.filename = ktlx_dir
self._filename = None # Path of dir and filename stem
self._hdr = self._read_hdr_dir()
def _read_hdr_dir(self):
"""Read the header for basic information.
Returns
-------
hdr : dict
- 'erd': header of .erd file
- 'stc': general part of .stc file
- 'stamps' : time stamp for each file
Also, it adds the attribute
_basename : Path
the name of the files inside the directory
"""
foldername = Path(self.filename)
stc_file = foldername / (foldername.stem + '.stc')
if stc_file.exists():
self._filename = stc_file.with_suffix('')
else: # if the folder was renamed
stc_file = list(foldername.glob('*.stc'))
if len(stc_file) == 1:
self._filename = foldername / stc_file[0].stem
elif len(stc_file) == 0:
raise FileNotFoundError('Could not find any .stc file.')
else:
raise OSError('Found too many .stc files: ' +
'\n'.join(str(x) for x in stc_file))
hdr = {}
# use .erd because it has extra info, such as sampling freq
# try to read any possible ERD (in case one or two ERD are missing)
# don't read very first erd because creation_time is slightly off
for erd_file in foldername.glob(self._filename.stem + '_*.erd'):
try:
hdr['erd'] = _read_hdr_file(erd_file)
# we need this to look up stc
hdr['erd'].update({'filename': erd_file.stem})
break
except (FileNotFoundError, PermissionError):
pass
stc = _read_stc(self._filename.with_suffix('.stc'))
hdr['stc'], hdr['stamps'] = stc
return hdr
[docs] def return_dat(self, chan, begsam, endsam):
"""Read the data based on begsam and endsam.
Parameters
----------
chan : list of int
list of channel indeces
begsam : int
index of the first sample
endsam :
index of the last sample
Returns
-------
ndarray
2-d matrix with data (might contain NaN)
Notes
-----
The sample numbering is not based on the samples in the files (i.e.
the first sample of the first file is NOT the first sample of the
dataset) because it depends on the stamps in the STC file. Usually, the
recording starts and after a few millisecond (maybe one second), the
actual acquisition starts. STC takes the offset into account. This has
the counterintuitive result that if you call read_data, the first few
hundreds samples are nan.
"""
dat = empty((len(chan), endsam - begsam))
dat.fill(NaN)
stc, all_stamp = _read_stc(self._filename.with_suffix('.stc'))
all_erd = all_stamp['segment_name'].astype('U') # convert to str
all_beg = all_stamp['start_stamp']
all_end = all_stamp['end_stamp']
try:
begrec = where((all_end >= begsam))[0][0]
endrec = where((all_beg < endsam))[0][-1]
except IndexError:
return dat
for rec in range(begrec, endrec + 1):
begpos_rec = max(begsam, all_beg[rec])
endpos_rec = min(endsam, all_end[rec] + 1) # check + 1
# this looks weird, but it takes into account whether the values
# are outside of the limits of the file
d1 = begpos_rec - begsam
d2 = endpos_rec - begsam
erd_file = (Path(self.filename) / all_erd[rec]).with_suffix('.erd')
try:
dat_rec = _read_erd(erd_file, begpos_rec, endpos_rec)
dat[:, d1:d2] = dat_rec[chan, :]
except (FileNotFoundError, PermissionError):
lg.warning('{} does not exist'.format(erd_file))
return dat
[docs] def return_hdr(self):
"""Return the header for further use.
Returns
-------
subj_id : str
subject identification code
start_time : datetime
start time of the dataset
s_freq : float
sampling frequency
chan_name : list of str
list of all the channels
n_samples : int
number of samples in the dataset
orig : dict
additional information taken directly from the header
"""
# information contained in .erd
orig = self._hdr['erd']
if orig['patient_id']:
subj_id = orig['patient_id']
else:
subj_id = (orig['pat_first_name'] + orig['pat_middle_name'] +
orig['pat_last_name'])
s_freq = orig['sample_freq']
start_time = _find_start_time(self._hdr, s_freq)
# information contained in .stc
n_samples = self._hdr['stamps'][-1]['end_stamp']
try:
ent_file = self._filename.with_suffix('.ent')
if not ent_file.exists():
ent_file = self._filename.with_suffix('.ent.old')
ent_notes = _read_ent(ent_file)
except (FileNotFoundError, PermissionError):
lg.warning('could not find .ent file, channels have arbitrary '
'names')
chan_name = ['chan{0:03}'.format(x) for x in
range(orig['num_channels'])]
else:
# use the last montage, hoping that it's the most accurate
for ent_note in reversed(ent_notes):
try:
chan_name = _find_channels(ent_note['value'])
chan_name = chan_name[:orig['num_channels']]
except:
continue
else:
break
try:
vtc_file = self._filename.with_suffix('.vtc')
orig['vtc'] = _read_vtc(vtc_file)
except (FileNotFoundError, PermissionError):
orig['vtc'] = None
try:
snc_file = self._filename.with_suffix('.snc')
orig['snc'] = _read_snc(snc_file)
except (FileNotFoundError, PermissionError):
orig['snc'] = None
return subj_id, start_time, s_freq, chan_name, n_samples, orig
[docs] def return_markers(self):
"""Reads the notes of the Ktlx recordings.
"""
ent_file = self._filename.with_suffix('.ent')
if not ent_file.exists():
ent_file = self._filename.with_suffix('.ent.old')
try:
ent_notes = _read_ent(ent_file)
except (FileNotFoundError, PermissionError):
markers = []
else:
allnote = []
for n in ent_notes:
try:
n['value'].keys()
allnote.append(n['value'])
except AttributeError:
lg.debug('Note of length {} was not '
'converted to dict'.format(n['length']))
s_freq = self._hdr['erd']['sample_freq']
pcname = '0CFEBE72-DA20-4b3a-A8AC-CDD41BFE2F0D'
note_time = []
note_name = []
note_note = []
for n in allnote:
if n['Text'] == 'Analyzed Data Note':
continue
if not n['Text']:
continue
if 'User' not in n['Data'].keys():
continue
user1 = n['Data']['User'] == 'Persyst'
user2 = False # n['Data']['User'] == 'eeg'
user3 = n['Data']['User'] == pcname
user4 = n['Data']['User'] == 'XLSpike - Intracranial'
user5 = n['Data']['User'] == 'XLEvent - Intracranial'
if user1 or user2 or user3 or user4 or user5:
continue
if len(n['Data']['User']) == 0:
note_name.append('-unknown-')
else:
note_name.append(n['Data']['User'].split()[0])
note_time.append(n['Stamp'] / s_freq)
note_note.append(n['Text'])
markers = []
for time, name, note in zip(note_time, note_name, note_note):
m = {'name': note + ' (' + name + ')',
'start': time,
'end': time,
'chan': None,
}
markers.append(m)
return markers
[docs] def return_videos(self, begtime, endtime):
s_freq = self._hdr['erd']['sample_freq']
snc = self._hdr['erd']['snc']
vtc = self._hdr['erd']['vtc']
beg_sam = begtime * s_freq
end_sam = endtime * s_freq
lg.info('Samples {}-{} (based on s_freq only)'.format(beg_sam,
end_sam))
# time in
beg_snc = convert_sample_to_video_time(beg_sam, s_freq, *snc)
end_snc = convert_sample_to_video_time(end_sam, s_freq, *snc)
beg_snc_str = beg_snc.strftime('%H:%M:%S')
end_snc_str = end_snc.strftime('%H:%M:%S')
lg.info('Time ' + beg_snc_str + '-' + end_snc_str +
' (based on s_freq only)')
if vtc is None:
raise OSError('No VTC file (and presumably no avi files)')
mpgfile, start_time, end_time = vtc
beg_avi = get_date_idx(beg_snc, start_time, end_time)
end_avi = get_date_idx(end_snc, start_time, end_time)
if beg_avi is None or end_avi is None:
raise IndexError('No video file for time range ' + beg_snc_str +
' - ' + end_snc_str)
lg.debug('First Video (#{}) {}'.format(beg_avi, mpgfile[beg_avi]))
lg.debug('Last Video (#{}) {}'.format(end_avi, mpgfile[end_avi]))
mpgfiles = mpgfile[beg_avi:end_avi + 1]
full_mpgfiles = [join(self.filename, one_mpg) for one_mpg in mpgfiles]
beg_diff = (beg_snc - start_time[beg_avi]).total_seconds()
end_diff = (end_snc - start_time[end_avi]).total_seconds()
lg.debug('First Video (#{}) starts at {}'.format(beg_avi, beg_diff))
lg.debug('Last Video (#{}) ends at {}'.format(end_avi, end_diff))
return full_mpgfiles, beg_diff, end_diff