Source code for wonambi.ioeeg.openephys

"""Module to read open-ephys .continuous files

TODO
----
It assumes that the lenght of a block (i.e. record) is 1024 data points but this
might change in the future.
"""
from datetime import datetime
from logging import getLogger
import locale
from struct import unpack, calcsize
from math import ceil
from re import search, match
from xml.etree import ElementTree

from numpy import array, empty, NaN, fromfile, unique, where, arange, hstack, vstack, concatenate

lg = getLogger(__name__)


HDR_LENGTH = 1024  # this is fixed
BLK_LENGTH = 1024  # this is currently fixed but might change in the future
BEG_BLK = '<qHH'  # time information and indices
DAT_FMT = '>' + BLK_LENGTH * 'h'  # actual data, big-endian
END_BLK = 10 * 'B'  # control values

BEG_BLK_SIZE = calcsize(BEG_BLK)
DAT_FMT_SIZE = calcsize(DAT_FMT)
BLK_SIZE = BEG_BLK_SIZE + DAT_FMT_SIZE + calcsize(END_BLK)

EVENT_TYPES = {
    3: 'TTL Event',
    5: 'Network Event',
    }
IGNORE_EVENTS = [
    'Network Event',
    ]


[docs]class OpenEphys: """Provide class OpenEphys, which can be used to read the folder containing Open-Ephys .continuous files Parameters ---------- filename : Path Full path for the OpenEphys folder session : int session number (starting at 1, based on open-ephys convention) Attributes ---------- channels : list of dict list of filenames referring to channels which are actually on disk blocks : 1D array length of each block, in samples (currently 1024) gain : 1D array gain to convert digital to microvolts (one value per channel) """ def __init__(self, filename, session=1): if session == 1: self.session = '' else: self.session = f'_{session:d}' self.filename = filename.resolve() self.openephys_file = filename / f'Continuous_Data{self.session}.openephys' self.settings_xml = filename / f'settings{self.session}.xml' self.messages_file = filename / f'messages{self.session}.events' self.events_file = filename / f'all_channels{self.session}.events'
[docs] def return_hdr(self): """Return the header for further use. Returns ------- subj_id : str subject identification code start_time : datetime start time of the dataset s_freq : float sampling frequency chan_name : list of str list of all the channels n_samples : int number of samples in the dataset orig : dict currently empty for open-ephys """ subj_id = self.filename.stem # use directory name as subject name start_time = _read_date(self.settings_xml) self.recordings = _read_openephys(self.openephys_file) self.s_freq = self.recordings[0]['s_freq'] channels = self.recordings[0]['channels'] self.segments, self.messages, self.offset = _read_messages_events(self.messages_file) # only use channels that are actually in the folder chan_name = [] self.channels = [] gain = [] for chan in channels: channel_filename = (self.filename / chan['filename']) if channel_filename.exists(): chan_name.append(chan['name']) self.channels.append(channel_filename) ch_gain = _check_header(channel_filename, self.s_freq, self.offset) gain.append(ch_gain) else: lg.warning(f'could not find {chan["filename"]} in {self.filename}') self.gain = array(gain) # read data structure (recordings can have multiple segments) data_offset = [int(x['channels'][0]['position']) for x in self.recordings] for i in range(len(self.segments) - 1): self.segments[i]['length'] = int((data_offset[i + 1] - data_offset[i]) / BLK_SIZE * BLK_LENGTH) self.segments[i]['data_offset'] = data_offset[i] self.segments[-1]['data_offset'] = data_offset[-1] file_length = channel_filename.stat().st_size self.segments[-1]['length'] = int((file_length - data_offset[-1]) / BLK_SIZE * BLK_LENGTH) for seg in self.segments: seg['end'] = seg['start'] + seg['length'] n_samples = self.segments[-1]['end'] self.blocks_dat, self.blocks_offset = _prepare_blocks(self.segments) orig = {} return subj_id, start_time, self.s_freq, chan_name, n_samples, orig
[docs] def return_dat(self, chan, begsam, endsam): """Read the data for some/all of the channels Parameters ---------- chan : list of int indices of the channels to read begsam : int start sample to read endsam : int end sample to read (exclusive) Returns ------- 2D array chan X samples recordings """ data_length = endsam - begsam dat = empty((len(chan), data_length)) dat.fill(NaN) all_blocks = _select_blocks(self.blocks_dat, begsam, endsam) for i_chan, sel_chan in enumerate(chan): with self.channels[sel_chan].open('rb') as f: for i_block in all_blocks: i_dat = self.blocks_dat[i_block, :] - begsam i_disk = self.blocks_offset[i_block].item() # read only data (no timestamp or record marker) f.seek(i_disk) x = array(unpack(DAT_FMT, f.read(DAT_FMT_SIZE))) beg_dat = max(i_dat[0], 0) end_dat = min(i_dat[1], data_length) beg_x = max(0, - i_dat[0]) segment_length = min(i_dat[1], data_length) - i_dat[0] end_x = min(len(x), segment_length) dat[i_chan, beg_dat:end_dat] = x[beg_x:end_x] return dat * self.gain[chan, None]
[docs] def return_markers(self): """Read the markers from the .events file """ all_markers = ( self.messages + _segments_to_markers(self.segments, self.offset) + _read_all_channels_events(self.events_file, self.s_freq, self.offset) ) return sorted(all_markers, key=lambda x: x['start'])
def _read_openephys(openephys_file): """Read the channel labels and their respective files from the 'Continuous_Data.openephys' file Parameters ---------- openephys_file : Path path to Continuous_Data.openephys inside the open-ephys folder Returns ------- int sampling frequency list of dict list of channels containing the label, the filename and the gain TODO ---- Check that all the recordings have the same channels and frequency """ root = ElementTree.parse(openephys_file).getroot() recordings = [] for recording in root: recordings.append({ 's_freq': float(recording.attrib['samplerate']), 'number': int(recording.attrib['number']), 'channels': [], }) channels = [] for processor in recording: for channel in processor: channels.append(channel.attrib) recordings[-1]['channels'] = channels return recordings def _read_date(settings_file): """Get the data from the settings.xml file Parameters ---------- settings_file : Path path to settings.xml inside open-ephys folder Returns ------- datetime start time of the recordings Notes ----- The start time is present in the header of each file. This might be useful if 'settings.xml' is not present. """ locale.setlocale(locale.LC_TIME, 'en_US.utf-8') root = ElementTree.parse(settings_file).getroot() for e0 in root: if e0.tag == 'INFO': for e1 in e0: if e1.tag == 'DATE': break return datetime.strptime(e1.text, '%d %b %Y %H:%M:%S') def _read_header(filename): """Read the text header for each file Parameters ---------- channel_file : Path path to single filename with the header Returns ------- dict header int the timestamp of the first sample in the data. It's like an offset for the data. It's necessary to align with the clock time and the markers. """ with filename.open('rb') as f: h = f.read(HDR_LENGTH).decode() header = {} for line in h.split('\n'): if '=' in line: key, value = line.split(' = ') key = key.strip()[7:] value = value.strip()[:-1] header[key] = value first_timestamp = unpack('q', f.read(8))[0] return header, first_timestamp def _check_header(channel_file, s_freq, offset): """For each file, make sure that the header is consistent with the information in the text file. Parameters ---------- channel_file : Path path to single filename with the header s_freq : int sampling frequency offset : int offset of the first timestamp Returns ------- int gain from digital to microvolts (the same information is stored in the Continuous_Data.openephys but I trust the header for each file more. int the timestamp of the first sample in the data. It's like an offset for the data. It's necessary to align with the clock time and the markers. """ hdr, first_timestamp = _read_header(channel_file) assert int(hdr['header_bytes']) == HDR_LENGTH assert int(hdr['blockLength']) == BLK_LENGTH assert int(hdr['sampleRate']) == s_freq assert first_timestamp == offset return float(hdr['bitVolts']) def _segments_to_markers(segments, first_timestamp): mrk = [] for i, seg in enumerate(segments): mrk.append({ 'name': f'START RECORDING #{i}', 'chan': None, 'start': seg['start'] / seg['s_freq'], 'end': seg['start'] / seg['s_freq'], }) mrk.append({ 'name': f'END RECORDING #{i}', 'chan': None, 'start': seg['end'] / seg['s_freq'], 'end': seg['end'] / seg['s_freq'], }) return mrk def _read_messages_events(messages_file): messages = [] segments = [] header = True with messages_file.open() as f: for l in f: m_time = search(r'\d+ Software time: (\d+)@\d+Hz', l) m_start = search(r'start time: (\d+)@(\d+)Hz', l) m_event = match(r'(\d+) (.+)', l) if m_time: # ignore Software time pass elif m_start: if header: offset = int(m_start.group(1)) s_freq = int(m_start.group(2)) header = False segments.append({ 'start': int(m_start.group(1)) - offset, 's_freq': int(m_start.group(2)), }) elif m_event: time = int(m_event.group(1)) messages.append({ 'name': m_event.group(2), 'start': (time - offset) / s_freq, 'end': (time - offset) / s_freq, 'chan': None, }) return segments, messages, offset def _read_all_channels_events(events_file, s_freq, offset): file_read = [ ('timestamps', '<i8'), ('sampleNum', '<i2'), ('eventType', '<u1'), ('nodeId', '<u1'), ('eventId', '<u1'), ('channel', '<u1'), ('recordingNumber', '<u2'), ] with events_file.open('rb') as f: f.seek(HDR_LENGTH) evt = fromfile(f, file_read) mrk = [] for evt_type in unique(evt['eventType']): timestamps = evt[evt['eventType'] == evt_type]['timestamps'] onsets = timestamps[::2] offsets = timestamps[1::2] for i_on, i_off in zip(onsets, offsets): mrk.append({ 'name': EVENT_TYPES[evt_type], 'start': (i_on - offset) / s_freq, 'end': (i_off - offset) / s_freq, 'chan': None, }) # skip some events (like Network Events) which are not very useful mrk = [evt for evt in mrk if not evt['name'] in IGNORE_EVENTS] return mrk def _prepare_blocks(segments): blocks_dat = [] blocks_offset = [] for seg in segments: n_blocks = ceil(seg['length'] / BLK_LENGTH) blocks_dat.append(vstack(( arange(n_blocks) * BLK_LENGTH + seg['start'], arange(n_blocks) * BLK_LENGTH + BLK_LENGTH + seg['start'], ))) blocks_offset.append( arange(n_blocks) * BLK_SIZE + seg['data_offset'] + BEG_BLK_SIZE) blocks_dat = hstack(blocks_dat).T blocks_offset = concatenate(blocks_offset) return blocks_dat, blocks_offset def _select_blocks(blocks_dat, begsam, endsam): all_blocks = ((blocks_dat[:, 1] - begsam) > 0) & ((endsam - blocks_dat[:, 0]) > 0) return where(all_blocks)[0]