"""Module reads and writes header and data for EDF data.
"""
from logging import getLogger
from datetime import datetime, timedelta, time, date
from pathlib import Path
from re import findall, finditer
from struct import pack
from fractions import Fraction
from numpy import (abs,
asarray,
empty,
fromfile,
iinfo,
ones,
max,
NaN,
newaxis,
repeat,
)
from scipy.signal import resample_poly
from .utils import decode, _select_blocks, DEFAULT_DATETIME
lg = getLogger(__name__)
EDF_FORMAT = 'int16' # by definition
edf_iinfo = iinfo(EDF_FORMAT)
N_BYTES = edf_iinfo.dtype.itemsize
DIGITAL_MAX = edf_iinfo.max
DIGITAL_MIN = -1 * edf_iinfo.max # so that digital 0 = physical 0
ANNOT_NAME = 'EDF Annotations'
PATTERN = b'(?P<onset>[+\-]\d+(?:\.\d*)?)(?:\x15(?P<duration>\d+(?:\.\d*)?))?(\x14(?P<annotation>[^\x00]*))?(?:\x14\x00)'
[docs]class Edf:
"""Provide class EDF, which can be used to read the header and the data.
Parameters
----------
edffile : str
Full path for the EDF file
Attributes
----------
hdr : dict
header taken from EDF file
"""
def __init__(self, edffile):
self.filename = Path(edffile)
self._read_hdr()
def _read_hdr(self):
"""Read header from EDF file.
It only reads the header for internal purposes and adds a hdr.
"""
with self.filename.open('rb') as f:
hdr = {}
assert f.tell() == 0
assert f.read(8) == b'0 '
# recording info
hdr['subject_id'] = decode(f.read(80)).strip()
hdr['recording_id'] = decode(f.read(80)).strip()
# parse timestamp
date_str = decode(f.read(8)).strip()
if date_str == '':
edf_date = DEFAULT_DATETIME.date()
else:
(day, month, year) = [int(x) for x in findall('(\d+)', date_str)]
# Y2K: cutoff is 1985
if year >= 85:
year += 1900
else:
year += 2000
edf_date = date(year, month, day)
time_str = decode(f.read(8)).strip()
if time_str == '':
edf_time = DEFAULT_DATETIME.time()
else:
(hour, minute, day) = [int(x) for x in findall('(\d+)', time_str)]
edf_time = time(hour, minute, day)
hdr['start_time'] = datetime.combine(edf_date, edf_time)
# misc
hdr['header_n_bytes'] = int(f.read(8))
f.seek(44, 1) # reserved for EDF+
hdr['n_records'] = int(f.read(8))
hdr['record_length'] = float(f.read(8)) # in seconds
nchannels = hdr['n_channels'] = int(f.read(4))
# read channel info
channels = range(hdr['n_channels'])
hdr['label'] = [decode(f.read(16)).strip() for n in
channels]
hdr['transducer'] = [decode(f.read(80)).strip()
for n in channels]
hdr['physical_dim'] = [decode(f.read(8)).strip() for n in
channels]
hdr['physical_min'] = [float(f.read(8)) for n in channels]
hdr['physical_max'] = [float(f.read(8)) for n in channels]
hdr['digital_min'] = [float(f.read(8)) for n in channels]
hdr['digital_max'] = [float(f.read(8)) for n in channels]
hdr['prefiltering'] = [decode(f.read(80)).strip()
for n in channels]
hdr['n_samples_per_record'] = [int(f.read(8)) for n in channels]
f.seek(32 * nchannels, 1) # reserved
assert f.tell() == hdr['header_n_bytes']
self.hdr = hdr
[docs] def return_hdr(self):
"""Return the header for further use.
Returns
-------
subj_id : str
subject identification code
start_time : datetime
start time of the dataset
s_freq : float
sampling frequency
chan_name : list of str
list of all the channels
n_samples : int
number of samples in the dataset
orig : dict
additional information taken directly from the header
Notes
-----
EDF+ accepts multiple frequency rates for different channels. Here, we
use only the highest sampling frequency (normally used for EEG and MEG
signals), and we UPSAMPLE all the other channels.
"""
try:
self.i_annot = self.hdr['label'].index(ANNOT_NAME)
except ValueError:
self.i_annot = None
self.smp_in_blk = sum(self.hdr['n_samples_per_record'])
self.max_smp = max(self.hdr['n_samples_per_record'])
n_blocks = self.hdr['n_records']
self.blocks = ones(n_blocks, dtype='int') * self.max_smp
self.dig_min = asarray(self.hdr['digital_min'])
self.phys_min = asarray(self.hdr['physical_min'])
phys_range = asarray(self.hdr['physical_max']) - self.phys_min
dig_range = asarray(self.hdr['digital_max']) - self.dig_min
if (phys_range < 0).any():
lg.warning('physical_min is higher than physical_max. Check whether the polarity of your recordings is correct')
if (dig_range < 0).any():
lg.warning('digital_min is higher than digital_max. Check whether the polarity of your recordings is correct')
if sum(self.dig_min) == 0:
lg.warning('digital_min is zero. Setting to -32767.')
self.dig_min = ones(self.dig_min.shape) * -32767.0
dig_range = 2 * 32767.0
self.gain = phys_range / dig_range
subj_id = self.hdr['subject_id']
start_time = self.hdr['start_time']
s_freq = self.max_smp / self.hdr['record_length']
chan_name = [label for label in self.hdr['label'] if label != ANNOT_NAME]
n_samples = self.max_smp * self.hdr['n_records']
return subj_id, start_time, s_freq, chan_name, n_samples, self.hdr
[docs] def return_dat(self, chan, begsam, endsam):
"""Read data from an EDF file.
Reads channel by channel, and adjusts the values by calibration.
Parameters
----------
chan : list of int
index (indices) of the channels to read
begsam : int
index of the first sample
endsam : int
index of the last sample
Returns
-------
numpy.ndarray
A 2d matrix, where the first dimension is the channels and the
second dimension are the samples.
"""
assert begsam < endsam
dat = empty((len(chan), endsam - begsam))
dat.fill(NaN)
with self.filename.open('rb') as f:
for i_dat, blk, i_blk in _select_blocks(self.blocks, begsam, endsam):
dat_in_rec = self._read_record(f, blk, chan)
dat[:, i_dat[0]:i_dat[1]] = dat_in_rec[:, i_blk[0]:i_blk[1]]
# calibration
dat = ((dat.astype('float64') - self.dig_min[chan, newaxis]) *
self.gain[chan, newaxis] + self.phys_min[chan, newaxis])
return dat
def _read_record(self, f, blk, chans):
"""Read raw data from a single EDF channel.
Parameters
----------
i_chan : int
index of the channel to read
begsam : int
index of the first sample
endsam : int
index of the last sample
Returns
-------
numpy.ndarray
A vector with the data as written on file, in 16-bit precision
"""
dat_in_rec = empty((len(chans), self.max_smp))
i_ch_in_dat = 0
for i_ch in chans:
offset, n_smp_per_chan = self._offset(blk, i_ch)
f.seek(offset)
x = fromfile(f, count=n_smp_per_chan, dtype=EDF_FORMAT)
ratio = self.max_smp / n_smp_per_chan
if ratio.is_integer():
dat_in_rec[i_ch_in_dat, :] = repeat(x, int(ratio))
else:
fract = round(Fraction(ratio), 2)
up, down = fract.numerator, fract.denominator
dat_in_rec[i_ch_in_dat, :] = resample_poly(x, up, down)
i_ch_in_dat += 1
return dat_in_rec
def _offset(self, blk, i_ch):
ch_in_rec = sum(self.hdr['n_samples_per_record'][:i_ch])
n_smp_per_chan = self.hdr['n_samples_per_record'][i_ch]
offset_in_blk = self.smp_in_blk * blk + ch_in_rec
offset = self.hdr['header_n_bytes'] + offset_in_blk * N_BYTES
return offset, n_smp_per_chan
[docs] def return_markers(self):
""""""
if self.i_annot is None:
return []
annotations = []
with self.filename.open('rb') as f:
for blk in range(self.hdr['n_records']):
offset, n_smp_per_chan = self._offset(blk, self.i_annot)
f.seek(offset)
annotations.extend(_read_tal(f.read(n_smp_per_chan)))
markers = []
for annot in annotations:
for name in annot['annotation']:
m = {'name': name,
'start': annot['onset'],
'end': annot['onset'] + annot['dur'],
'chan': None,
}
markers.append(m)
return markers
[docs]def write_edf(data, filename, subj_id='X X X X', physical_max=1000,
physical_min=None):
"""Export data to FieldTrip.
Parameters
----------
data : instance of ChanTime
data with only one trial
filename : path to file
file to export to (include '.mat')
subj_id : str
subject id
physical_max : int
values above this parameter will be considered saturated (and also
those that are too negative). This parameter defines the precision.
Notes
-----
Data is always recorded as 2 Byte int (which is 'int16'), so precision is
limited. You can control the precision with physical_max. To get the
precision:
>>> precision = physical_max / DIGITAL_MAX
where DIGITAL_MAX is 32767.
"""
if data.start_time is None:
raise ValueError('Data should contain a valid start_time (as datetime)')
start_time = data.start_time + timedelta(seconds=data.axis['time'][0][0])
if physical_max is None:
physical_max = max(abs(data.data[0]))
precision = physical_max / DIGITAL_MAX
lg.info('Data exported to EDF will have precision ' + str(precision))
if physical_min is None:
physical_min = -1 * physical_max
dat = data.data[0] / physical_max * DIGITAL_MAX
dat = dat.astype(EDF_FORMAT)
dat[dat > DIGITAL_MAX] = DIGITAL_MAX
dat[dat < DIGITAL_MIN] = DIGITAL_MIN
with open(filename, 'wb') as f:
f.write('{:<8}'.format(0).encode('ascii'))
f.write('{:<80}'.format(subj_id).encode('ascii')) # subject_id
f.write('{:<80}'.format('Startdate X X X X').encode('ascii'))
f.write(start_time.strftime('%d.%m.%y').encode('ascii'))
f.write(start_time.strftime('%H.%M.%S').encode('ascii'))
n_smp = data.data[0].shape[1]
s_freq = int(data.s_freq)
n_records = n_smp // s_freq # floor
record_length = 1
n_channels = data.number_of('chan')[0]
header_n_bytes = 256 + 256 * n_channels
f.write('{:<8d}'.format(header_n_bytes).encode('ascii'))
f.write((' ' * 44).encode('ascii')) # reserved for EDF+
f.write('{:<8}'.format(n_records).encode('ascii'))
f.write('{:<8d}'.format(record_length).encode('ascii'))
f.write('{:<4}'.format(n_channels).encode('ascii'))
for chan in data.axis['chan'][0]:
f.write('{:<16}'.format(chan).encode('ascii')) # label
for _ in range(n_channels):
f.write(('{:<80}').format('').encode('ascii')) # tranducer
for _ in range(n_channels):
f.write('{:<8}'.format('uV').encode('ascii')) # physical_dim
for _ in range(n_channels):
f.write('{:<8}'.format(physical_min).encode('ascii'))
for _ in range(n_channels):
f.write('{:<8}'.format(physical_max).encode('ascii'))
for _ in range(n_channels):
f.write('{:<8}'.format(DIGITAL_MIN).encode('ascii'))
for _ in range(n_channels):
f.write('{:<8}'.format(DIGITAL_MAX).encode('ascii'))
for _ in range(n_channels):
f.write('{:<80}'.format('').encode('ascii')) # prefiltering
for _ in range(n_channels):
f.write('{:<8d}'.format(s_freq).encode('ascii')) # n_smp in record
for _ in range(n_channels):
f.write((' ' * 32).encode('ascii'))
length_record = s_freq * n_channels # length of one record
for i in range(n_records):
i0 = i * s_freq
i1 = i0 + s_freq
x = dat[:, i0:i1].flatten(order='C') # assumes it's ChanTimeData
f.write(pack('<' + 'h' * length_record, *x))
def _read_tal(rawbytes):
"""Read TAL (Time-stamped Annotations Lists) using regex
Parameters
----------
rawbytes : bytes
raw information from file
Returns
-------
annotation : list of dict
where each dict contains onset, duration, and list with the annotations
"""
annotations = []
for m in finditer(PATTERN, rawbytes):
d = m.groupdict()
annot = {'onset': float(decode(d['onset'])),
'dur': float(decode(d['duration'])) if d['duration'] else 0.,
'annotation': [decode(a) for a in d['annotation'].split(b'\x14') if a],
}
if annot['annotation']:
annotations.append(annot)
return annotations
[docs]def remove_datetime(filename):
"""Remove datetime from filename by overwriting the date / time info.
Parameters
----------
filename : Path
path to edf file
Notes
-----
It modifies the file.
TODO
----
This function might be part of a large anonymization procedure for edf
"""
with Path(filename).open('r+b') as f:
f.seek(168)
f.write(16 * b' ')