"""Module to keep track of the user-made annotations and sleep scoring.
"""
from logging import getLogger
from bisect import bisect_left
from itertools import groupby
from csv import reader, writer
from json import dump
from datetime import datetime, timedelta
from numpy import (allclose, arange, around, asarray, clip, diff, isnan,
logical_and, modf, nan)
from math import ceil, inf
from os.path import basename, splitext
from pathlib import Path
from re import search, sub
from scipy.io import loadmat
from xml.etree.ElementTree import Element, SubElement, tostring, parse
from xml.dom.minidom import parseString
try:
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QProgressDialog
except ImportError:
Qt = None
QProgressDialog = None
from .. import __version__
from ..utils.exceptions import UnrecognizedFormat
lg = getLogger(__name__)
VERSION = '5'
DOMINO_STAGE_KEY = {'N1': 'NREM1',
'N2': 'NREM2',
'N3': 'NREM3',
'Re': 'REM',
'Wa': 'Wake',
'Ar': 'Artefact',
'A\n': 'Artefact'}
REMLOGIC_STAGE_KEY = {'SLEEP-S0': 'Wake',
'SLEEP-S1': 'NREM1',
'SLEEP-S2': 'NREM2',
'SLEEP-S3': 'NREM3',
'SLEEP-S4': 'NREM3',
'SLEEP-REM': 'REM',
'SLEEP-RE': 'REM',
'SLEEP-UNSCORED': 'Undefined'}
ALICE_STAGE_KEY = {'WK': 'Wake',
'N1': 'NREM1',
'N2': 'NREM2',
'N3': 'NREM3',
'EM': 'REM',
'S1': 'NREM1',
'S2': 'NREM2',
'S3': 'NREM3',
'W': 'Wake',
'1': 'NREM1',
'2': 'NREM2',
'3': 'NREM3',
'11': 'Wake',
'12': 'NREM1',
'13': 'NREM2',
'14': 'NREM3',
'15': 'REM'}
SANDMAN_STAGE_KEY = {' 1': 'NREM1',
' 2': 'NREM2',
' 3': 'NREM3',
' 4': 'NREM3',
'em': 'REM',
'ke': 'Wake',
'1\t': 'NREM1',
'2\t': 'NREM2',
'3\t': 'NREM3',
'4\t': 'NREM3',
'm\t': 'REM',
'e\t': 'Wake'}
COMPUMEDICS_STAGE_KEY = {'?': 'Undefined',
'?\n': 'Undefined',
'W': 'Wake',
'W\n': 'Wake',
'N1': 'NREM1',
'N2': 'NREM2',
'N3': 'NREM3',
'R\n': 'REM',
'0': 'Wake',
'0\n': 'Wake',
'1': 'NREM1',
'1\n': 'NREM1',
'2': 'NREM2',
'2\n': 'NREM2',
'3': 'NREM3',
'3\n': 'NREM3',
'4': 'NREM3',
'4\n': 'NREM3',
'5': 'NREM3',
'5\n': 'NREM3',
'R': 'REM',
'R\n': 'REM' }
FASST_STAGE_KEY = ['Wake',
'NREM1',
'NREM2',
'NREM3',
None,
'REM',
'Movement',
'Unknown'
]
PRANA_STAGE_KEY = {'0': 'Wake',
'1': 'NREM1',
'2': 'NREM2',
'3': 'NREM3',
'5': 'REM'}
DELTAMED_STAGE_KEY = {'1': 'Wake',
'2': 'REM',
'3': 'NREM1',
'4': 'NREM2',
'5': 'NREM3'}
PHYSIP_STAGE_KEY = {'0': 'NREM3',
'1': 'NREM2',
'2': 'NREM1',
'3': 'REM',
'4': 'Wake',
'5': 'Artefact'}
PHILLIPS_STAGE_KEY = {'WK': 'Wake',
'N1': 'NREM1',
'N2': 'NREM2',
'N3': 'NREM3',
'RE': 'REM'}
BIDS_STAGE_KEY = {'Wake': 'sleep_wake',
'NREM1': 'sleep_N1',
'NREM2': 'sleep_N2',
'NREM3': 'sleep_N3',
'REM': 'sleep_REM',
'Artefact': 'artifact',
'Movement': 'artifact_motion',
'Unknown': '',
'Undefined': ''}
[docs]def parse_iso_datetime(date):
try:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S")
except ValueError:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
[docs]def create_empty_annotations(xml_file, dataset):
"""Create an empty annotation file.
Notes
-----
Dates are made time-zone unaware.
"""
xml_file = Path(xml_file)
root = Element('annotations')
root.set('version', VERSION)
info = SubElement(root, 'dataset')
x = SubElement(info, 'filename')
x.text = str(dataset.filename)
x = SubElement(info, 'path') # not to be relied on
x.text = str(dataset.filename)
x = SubElement(info, 'start_time')
start_time = dataset.header['start_time'].replace(tzinfo=None)
x.text = start_time.isoformat()
first_sec = 0
last_sec = int(dataset.header['n_samples'] /
dataset.header['s_freq']) # in s
x = SubElement(info, 'first_second')
x.text = str(first_sec)
x = SubElement(info, 'last_second')
x.text = str(last_sec)
xml = parseString(tostring(root))
with xml_file.open('w') as f:
f.write(xml.toxml())
[docs]def create_annotation(xml_file, from_fasst, lights_off=False, legacy_meth=False):
"""Create annotations by importing from FASST sleep scoring file.
Parameters
----------
xml_file : path to xml file
annotation file that will be created
from_fasst : path to FASST file
.mat file containing the scores
lights_off : bool
set to True if scoring begins at lights off
legacy_meth : bool
old functionality; will stage epochs starting at epoch_start = 0.
Returns
-------
instance of Annotations
TODO
----
Merge create_annotation and create_empty_annotations
"""
xml_file = Path(xml_file)
try:
mat = loadmat(str(from_fasst), variable_names='D', struct_as_record=False,
squeeze_me=True)
except ValueError:
raise UnrecognizedFormat(str(from_fasst) + ' does not look like a FASST .mat file')
D = mat['D']
info = D.other.info
score = D.other.CRC.score
if score.ndim == 1:
score = score[:, None] # atleast_2d adds the dimension at the beginning
microsecond, second = modf(info.hour[2])
start_time = datetime(*info.date, int(info.hour[0]), int(info.hour[1]),
int(second), int(microsecond * 1e6))
first_sec = 0
if lights_off:
first_sec = int(round(score[3, 0][0]))
last_sec = score[0, 0].shape[0] * score[2, 0]
root = Element('annotations')
root.set('version', VERSION)
try:
filename = D.fname
path = D.path
except AttributeError:
filename = D.other.info.fname
path = D.other.info.path
info = SubElement(root, 'dataset')
x = SubElement(info, 'filename')
x.text = filename
x = SubElement(info, 'path') # not to be relied on
x.text = path
x = SubElement(info, 'start_time')
x.text = start_time.isoformat()
x = SubElement(info, 'first_second')
x.text = str(first_sec)
x = SubElement(info, 'last_second')
x.text = str(int(last_sec))
xml = parseString(tostring(root))
with xml_file.open('w') as f:
f.write(xml.toxml())
annot = Annotations(xml_file)
n_raters = score.shape[1]
for i_rater in range(n_raters):
rater_name = score[1, i_rater]
epoch_length = int(score[2, i_rater])
annot.add_rater(rater_name, epoch_length=epoch_length)
if not legacy_meth:
offset = first_sec % epoch_length
annot.get_rater(rater_name)
stages = annot.rater.find('stages')
# list is necessary so that it does not remove in place
for s in list(stages):
stages.remove(s)
for i in arange(offset, first_sec - epoch_length, epoch_length):
epoch = SubElement(stages, 'epoch')
start_time = SubElement(epoch, 'epoch_start')
epoch_beg = i
start_time.text = str(epoch_beg)
end_time = SubElement(epoch, 'epoch_end')
end_time.text = str(epoch_beg + epoch_length)
epoch_stage = SubElement(epoch, 'stage')
epoch_stage.text = 'Unknown'
quality = SubElement(epoch, 'quality')
quality.text = 'Good'
idx_epoch = 0
for idx_epoch, fasst_stage in enumerate(score[0, i_rater]):
epoch = SubElement(stages, 'epoch')
start_time = SubElement(epoch, 'epoch_start')
epoch_beg = first_sec + (idx_epoch * epoch_length)
start_time.text = str(epoch_beg)
end_time = SubElement(epoch, 'epoch_end')
end_time.text = str(epoch_beg + epoch_length)
epoch_stage = SubElement(epoch, 'stage')
try:
one_stage = FASST_STAGE_KEY[int(fasst_stage)]
except:
one_stage = 'Unknown'
lg.info(f'Stage not recognized at second {epoch_beg}')
epoch_stage.text = one_stage
quality = SubElement(epoch, 'quality')
quality.text = 'Good'
else:
for epoch_start, epoch in enumerate(score[0, i_rater]):
if isnan(epoch):
continue
annot.set_stage_for_epoch(epoch_start * epoch_length,
FASST_STAGE_KEY[int(epoch)], save=False)
annot.save()
return annot
[docs]class Annotations():
"""Class to return nicely formatted information from xml.
Parameters
----------
xml_file : path to xml file
Annotation xml file
"""
def __init__(self, xml_file, rater_name=None):
self.xml_file = xml_file
self.root = self.load()
if rater_name is None:
self.rater = self.root.find('rater')
else:
self.get_rater(rater_name)
[docs] def load(self):
"""Load xml from file."""
lg.info('Loading ' + str(self.xml_file))
update_annotation_version(self.xml_file)
xml = parse(self.xml_file)
return xml.getroot()
[docs] def save(self):
"""Save xml to file."""
if self.rater is not None:
self.rater.set('modified', datetime.now().isoformat())
xml = parseString(tostring(self.root))
with open(self.xml_file, 'w') as f:
f.write(xml.toxml())
@property
def dataset(self):
return self.root.find('dataset/path').text
@property
def start_time(self):
return parse_iso_datetime(self.root.find('dataset/start_time').text)
@property
def first_second(self):
return int(self.root.find('dataset/first_second').text)
@property
def last_second(self):
return int(self.root.find('dataset/last_second').text)
@property
def current_rater(self):
try:
return self.rater.get('name')
except AttributeError:
raise IndexError('No rater in the annotations')
@property
def raters(self):
return [rater.get('name') for rater in self.root.iter('rater')]
@property
def epoch_length(self):
epoch = next(self.epochs)
return around(epoch['end'] - epoch['start'])
[docs] def get_rater(self, rater_name):
# get xml root for one rater
found = False
for rater in self.root.iterfind('rater'):
if rater.get('name') == rater_name:
self.rater = rater
found = True
if not found:
raise KeyError(rater_name + ' not in the list of raters (' +
', '.join(self.raters) + ')')
[docs] def add_rater(self, rater_name, epoch_length=30):
if rater_name in self.raters:
lg.warning('rater ' + rater_name + ' already exists, selecting it')
self.get_rater(rater_name)
return
# add one rater + subtree
rater = SubElement(self.root, 'rater')
rater.set('name', rater_name)
rater.set('created', datetime.now().isoformat())
self.get_rater(rater_name)
# create subtree
SubElement(self.rater, 'bookmarks')
SubElement(self.rater, 'events')
SubElement(self.rater, 'stages')
SubElement(self.rater, 'cycles')
self.create_epochs(epoch_length=epoch_length)
self.save()
[docs] def rename_rater(self, name, new_name):
"""Rename event type."""
for rater in self.root.iterfind('rater'):
if rater.get('name') == name:
rater.set('name', new_name)
self.save()
[docs] def remove_rater(self, rater_name):
# remove one rater
for rater in self.root.iterfind('rater'):
if rater.get('name') == rater_name:
# here we deal with the current rater
if rater is self.rater:
all_raters = self.root.findall('rater')
if len(all_raters) == 1:
self.rater = None
else:
idx = all_raters.index(self.rater)
idx -= 1 # select the previous rater
if idx == -1:
idx = 1 # rater to delete is 0
self.rater = all_raters[idx]
self.root.remove(rater)
self.save()
[docs] def import_staging(self, filename, source, rater_name, rec_start,
staging_start=None, epoch_length=None,
poor=['Artefact'], as_qual=False):
"""Import staging from an external staging text file.
Parameters
----------
filename : str
Staging file name.
source : str
Name of program where staging was made. One of 'domino', 'alice',
'compumedics', 'sandman', 'remlogic', 'phillips', 'grael', 'egi'
rater_name : str
Rater name for imported staging.
rec_start : datetime
Date and time (year, month, day, hour, minute, second) of recording
start. Year is ignored (New Year's Eve celebratory recordings
unsupported.)
staging_start : datetime (default: None)
Date and time of staging start. For use when not provided in
staging file.
epoch_length : int
duration in s of a scoring epoch
poor : list of str
epochs with stage names in this list will be marked as Poor quality
as_qual : bool
if True, the staging only be used to mark quality, as per poor
"""
if as_qual and rater_name not in self.raters:
self.parent.statusBar.showMessage('Rater not found.')
return
clue = None # used in some instances to pick out epochs from other evts
idx_clue = None
redherring = None
if source in ['remlogic', 'sandman']:
encoding = 'ISO-8859-1'
else:
encoding = 'utf-8'
with open(filename, 'r', encoding=encoding) as f:
lines = f.readlines()
if source == 'domino':
for i, line in enumerate(lines):
if line[0].isdigit():
idx_first_line = i
break
if lines[idx_first_line].index(';') > 15:
idx_time = (11, 19)
idx_stage = slice(25, 26)
stage_key = PHYSIP_STAGE_KEY
else:
idx_time = (0, 8)
idx_stage = slice(14, 16)
stage_key = DOMINO_STAGE_KEY
stage_start = datetime.strptime(
lines[idx_first_line][idx_time[0]:idx_time[1]], '%H:%M:%S')
stage_day = int(lines[1][12:14])
stage_month = int(lines[1][15:17])
stage_start_for_delta = stage_start.replace(year=1999,
month=stage_month,
day=stage_day)
rec_start_for_delta = rec_start.replace(year=1999)
first_second = int((stage_start_for_delta -
rec_start_for_delta).total_seconds())
if epoch_length is None:
epoch_length = int(lines[5][6:8])
elif source == 'remlogic':
clue = 'SLEEP-' # signifies an epoch (as opposed to an event)
time_hdrs = ('Time [hh:mm:ss]', 'Heure [hh:mm:ss]')
redherring = 'SPINDLE'
idx_clue = slice(-18, -6)
idx_head = lines.index(
next(l for l in lines if any(
hdr in l for hdr in time_hdrs
)))
first_line = next(l for l in lines[idx_head:] \
if clue in l[idx_clue] \
and 'SLEEP-RM' not in l[-18:])
idx_first_line = lines.index(first_line)
stage_start_date = _try_parse_datetime(
lines[3][16:lines[3].index('\n')],
('%Y/%m/%d', '%d/%m/%Y'))
stage_start_time = None
try:
stage_start_time = datetime.strptime(
first_line[:19], '%Y-%m-%dT%H:%M:%S')
except ValueError:
cells = first_line.split('\t')
for cell in cells:
try:
if 'PM' in cell:
stage_start_time = datetime.strptime(cell,
'%I:%M:%S %p')
else:
stage_start_time = datetime.strptime(cell[-8:],
'%I:%M:%S')
if cell[1] == 'U':
stage_start_time = (stage_start_time
+ timedelta(hours=12))
except ValueError:
continue
if stage_start_time == None:
raise ValueError('No valid start time found.')
stage_start = datetime.combine(stage_start_date.date(),
stage_start_time.time())
first_second = int((stage_start - rec_start).total_seconds())
stage_key = {k[-2:]: v for k, v in REMLOGIC_STAGE_KEY.items()}
idx_stage = slice(-6, -4)
if epoch_length is None:
epoch_length = int(first_line[-3:-1])
elif source == 'alice':
dt = rec_start
line1 = lines[1]
row_offset = 0
if line1[1] == '\t': # some files have an index column
row_offset += 2
if ' pm' in line1.lower() or ' am' in line1.lower():
stage_start = datetime.strptime(
line1[row_offset:row_offset + 11], '%I:%M:%S %p')
# best guess in absence of date
if line1[11:13] == 'pm' and rec_start.hour < 12:
dt = rec_start - timedelta(days=1)
elif line1[11:13] == 'am' and rec_start.hour > 12:
dt = rec_start + timedelta
elif ':' in line1:
stage_start = datetime.strptime(
line1[row_offset:row_offset + 8].strip(), '%H:%M:%S')
else:
stage_start = rec_start
stage_start = stage_start.replace(year=dt.year,
month=dt.month,
day=dt.day)
first_second = int((stage_start - rec_start).total_seconds())
idx_first_line = 1
lines[-1] += '_' # to fill newline position
stage_key = ALICE_STAGE_KEY
idx_stage = slice(-3, -1)
if epoch_length is None:
epoch_length = 30
elif source == 'sandman':
if epoch_length is None:
epoch_length = 30
if 'Epoch' in lines[0]:
start_epoch = int(lines[1].split('\t')[0])
stage_start = rec_start + timedelta(seconds=(start_epoch*epoch_length))
idx_first_line = 1
first_second = int((stage_start - rec_start).total_seconds())
stage_key = SANDMAN_STAGE_KEY
idx_stage = slice(-3, -1)
else:
idx_first_line = 14
# check length of YEAR is 2 or 4 digits
if len(lines[4][12:].split(" ")[0].split('/')[2]) ==2:
date_str = lines[4][12:].split(" ")[0]
time_str = lines[idx_first_line].split('\t')[2].rstrip()
# start from epoch, since 1st epoch is not always from 1
stage_start = datetime.strptime(date_str + " " + time_str,
'%d/%m/%y %I:%M:%S %p')
elif len(lines[4][12:].split(" ")[0].split('/')[2]) == 4:
date_str = lines[4][12:].split(" ")[0]
time_str = lines[idx_first_line].split('\t')[2].rstrip()
stage_start = datetime.strptime(date_str + " " + time_str,
'%d/%m/%Y %I:%M:%S %p')
first_second = int((stage_start - rec_start).total_seconds())
stage_key = SANDMAN_STAGE_KEY
idx_stage = slice(-14, -12)
elif source in ['compumedics', 'grael', 'egi']:
if staging_start is None:
first_second = 0
else:
first_second = int((
staging_start - rec_start).total_seconds())
idx_first_line = 0
stage_key = COMPUMEDICS_STAGE_KEY
idx_stage = slice(0, 2)
if epoch_length is None:
epoch_length = 30
elif source == 'deltamed':
if staging_start is None:
first_second = 0
else:
first_second = int((
staging_start - rec_start).total_seconds())
idx_first_line = 0
stage_key = DELTAMED_STAGE_KEY
idx_stage = slice(-2, -1)
if epoch_length is None:
epoch_length = int(lines[0][:lines[0].index('\t')])
elif source == 'prana':
stage_start = datetime.strptime(lines[5][:11], '%d %H:%M:%S')
# best guess in absence of date
dt = rec_start
if stage_start.hour > 12 and rec_start.hour < 12:
dt = rec_start - timedelta(days=1)
elif stage_start.hour < 12 and rec_start.hour > 12:
dt = rec_start + timedelta(days=1)
stage_start = stage_start.replace(year=dt.year,
month=dt.month,
day=dt.day)
first_second = int((stage_start - rec_start).total_seconds())
idx_first_line = 5
stage_key = PRANA_STAGE_KEY
spacer = next(i for i, j in enumerate(lines[5][30:]) \
if j.strip())
idx_stage = slice(30 + spacer, 30 + spacer + 1)
if epoch_length is None:
idx_epoch_length = None
for i,j in enumerate(lines[3]):
if j.isdigit():
idx_epoch_length = i, i + lines[3][i:].index(' ')
epoch_length = int(lines[3][slice(*idx_epoch_length)])
break
if idx_epoch_length is None:
epoch_length = 30
elif source == 'phillips':
if not isinstance(staging_start, datetime):
dt = rec_start
staging_start_str = lines[1].split(',')[1].lower()
staging_start = datetime.strptime(staging_start_str,
'%I:%M:%S %p')
# best guess in absence of date
if staging_start_str[-2:] == 'pm' and rec_start.hour < 12:
dt = rec_start - timedelta(days=1)
elif staging_start_str[-2:] == 'am' and rec_start.hour > 12:
dt = rec_start + timedelta(days=1)
staging_start = staging_start.replace(year=dt.year,
month=dt.month,
day=dt.day)
first_second = int((staging_start - rec_start).total_seconds())
idx_first_line = 0
stage_key = PHILLIPS_STAGE_KEY
idx_stage = slice(0, 2)
if epoch_length is None:
epoch_length = 30
lines = [x.split(',')[2] for x in lines]
else:
raise ValueError('Unknown source program for staging file')
offset = first_second % epoch_length
lg.info('Time offset: ' + str(offset) + ' sec')
if rater_name not in self.raters:
self.add_rater(rater_name)
self.get_rater(rater_name)
stages = self.rater.find('stages')
if as_qual:
for i, one_line in enumerate(lines[idx_first_line:]):
if one_line[idx_stage] in poor:
epoch_beg = first_second + (i * epoch_length)
try:
self.set_stage_for_epoch(epoch_beg, 'Poor',
attr='quality',
save=False)
except KeyError:
return 1
else:
# list is necessary so that it does not remove in place
for s in list(stages):
stages.remove(s)
for i in arange(offset, first_second - epoch_length, epoch_length):
epoch = SubElement(stages, 'epoch')
start_time = SubElement(epoch, 'epoch_start')
epoch_beg = i
start_time.text = str(epoch_beg)
end_time = SubElement(epoch, 'epoch_end')
end_time.text = str(epoch_beg + epoch_length)
epoch_stage = SubElement(epoch, 'stage')
epoch_stage.text = 'Unknown'
quality = SubElement(epoch, 'quality')
quality.text = 'Good'
idx_epoch = 0
for i, one_line in enumerate(lines[idx_first_line:]):
if clue is not None:
if clue not in one_line[idx_clue]:
continue
if redherring is not None:
if redherring in one_line:
continue
epoch = SubElement(stages, 'epoch')
start_time = SubElement(epoch, 'epoch_start')
epoch_beg = first_second + (idx_epoch * epoch_length)
start_time.text = str(epoch_beg)
end_time = SubElement(epoch, 'epoch_end')
end_time.text = str(epoch_beg + epoch_length)
epoch_stage = SubElement(epoch, 'stage')
try:
key = one_line[idx_stage]
one_stage = stage_key[key]
except KeyError:
one_stage = 'Unknown'
lg.info('Stage not recognized: ' + key)
epoch_stage.text = one_stage
quality = SubElement(epoch, 'quality')
if one_stage in poor:
quality.text = 'Poor'
else:
quality.text = 'Good'
idx_epoch += 1
self.save()
[docs] def add_bookmark(self, name, time, chan=''):
"""Add a new bookmark
Parameters
----------
name : str
name of the bookmark
time : (float, float)
float with start and end time in s
Raises
------
IndexError
When there is no selected rater
"""
try:
bookmarks = self.rater.find('bookmarks')
except AttributeError:
raise IndexError('You need to have at least one rater')
new_bookmark = SubElement(bookmarks, 'bookmark')
bookmark_name = SubElement(new_bookmark, 'bookmark_name')
bookmark_name.text = name
bookmark_time = SubElement(new_bookmark, 'bookmark_start')
bookmark_time.text = str(time[0])
bookmark_time = SubElement(new_bookmark, 'bookmark_end')
bookmark_time.text = str(time[1])
if isinstance(chan, (tuple, list)):
chan = ', '.join(chan)
event_chan = SubElement(new_bookmark, 'bookmark_chan')
event_chan.text = chan
self.save()
[docs] def remove_bookmark(self, name=None, time=None, chan=None):
"""if you call it without arguments, it removes ALL the bookmarks."""
bookmarks = self.rater.find('bookmarks')
for m in bookmarks:
bookmark_name = m.find('bookmark_name').text
bookmark_start = float(m.find('bookmark_start').text)
bookmark_end = float(m.find('bookmark_end').text)
bookmark_chan = m.find('bookmark_chan').text
if bookmark_chan is None: # xml doesn't store empty string
bookmark_chan = ''
if name is None:
name_cond = True
else:
name_cond = bookmark_name == name
if time is None:
time_cond = True
else:
time_cond = (time[0] <= bookmark_end and
time[1] >= bookmark_start)
if chan is None:
chan_cond = True
else:
chan_cond = bookmark_chan == chan
if name_cond and time_cond and chan_cond:
bookmarks.remove(m)
self.save()
[docs] def get_bookmarks(self, time=None, chan=None):
"""
Raises
------
IndexError
When there is no selected rater
"""
# get bookmarks inside window
try:
bookmarks = self.rater.find('bookmarks')
except AttributeError:
raise IndexError('You need to have at least one rater')
mrks = []
for m in bookmarks:
bookmark_start = float(m.find('bookmark_start').text)
bookmark_end = float(m.find('bookmark_end').text)
bookmark_chan = m.find('bookmark_chan').text
if bookmark_chan is None: # xml doesn't store empty string
bookmark_chan = ''
if time is None:
time_cond = True
else:
time_cond = (time[0] <= bookmark_end and
time[1] >= bookmark_start)
if chan is None:
chan_cond = True
else:
chan_cond = bookmark_chan == chan
if time_cond and chan_cond:
one_mrk = {'name': m.find('bookmark_name').text,
'start': bookmark_start,
'end': bookmark_end,
'chan': bookmark_chan.split(', '), # always a list
}
mrks.append(one_mrk)
return mrks
@property
def event_types(self):
"""
Raises
------
IndexError
When there is no selected rater
"""
try:
events = self.rater.find('events')
except AttributeError:
raise IndexError('You need to have at least one rater')
return [x.get('type') for x in events]
[docs] def add_event_type(self, name):
"""
Raises
------
IndexError
When there is no selected rater
"""
if name in self.event_types:
lg.info('Event type ' + name + ' exists already.')
return
events = self.rater.find('events')
new_event_type = SubElement(events, 'event_type')
new_event_type.set('type', name)
self.save()
[docs] def remove_event_type(self, name):
"""Remove event type based on name."""
if name not in self.event_types:
lg.info('Event type ' + name + ' was not found.')
events = self.rater.find('events')
# list is necessary so that it does not remove in place
for e in list(events):
if e.get('type') == name:
events.remove(e)
self.save()
[docs] def rename_event_type(self, name, new_name):
"""Rename event type."""
if name not in self.event_types:
lg.info('Event type ' + name + ' was not found.')
events = self.rater.find('events')
for e in list(events):
if e.get('type') == name:
e.set('type', new_name)
self.save()
[docs] def add_event(self, name, time, chan=''):
"""Add event to annotations file.
Parameters
----------
name : str
Event type name.
time : tuple/list of float
Start and end times of event, in seconds from recording start.
chan : str or list of str, optional
Channel or channels associated with event.
Raises
------
IndexError
When there is no rater / epochs at all
"""
if name not in self.event_types:
self.add_event_type(name)
events = self.rater.find('events')
pattern = "event_type[@type='" + name + "']"
event_type = events.find(pattern)
new_event = SubElement(event_type, 'event')
event_start = SubElement(new_event, 'event_start')
event_start.text = str(time[0])
event_end = SubElement(new_event, 'event_end')
event_end.text = str(time[1])
if isinstance(chan, (tuple, list)):
chan = ', '.join(chan)
event_chan = SubElement(new_event, 'event_chan')
event_chan.text = chan
event_qual = SubElement(new_event, 'event_qual')
event_qual.text = 'Good'
self.save()
[docs] def add_events(self, event_list, name=None, chan=None, parent=None):
"""Add series of events. Faster than calling add_event in a loop.
Parameters
----------
event_list : list of dict
each dict is an event with 'start' and 'end' times
name : str, optional
save events to this event type.
chan : str or list of str, optional
save events to this or these channel(s). If None, channel will be
read from the event list dict under 'chan'
"""
if name is not None:
evt_name = name
if name not in self.event_types:
self.add_event_type(name)
events = self.rater.find('events')
if parent is not None:
progress = QProgressDialog('Saving events', 'Abort',
0, len(events) - 1, parent)
progress.setWindowModality(Qt.ApplicationModal)
for i, evt in enumerate(event_list):
if name is None:
evt_name = evt['name']
if evt_name not in self.event_types:
self.add_event_type(evt_name)
pattern = "event_type[@type='" + evt_name + "']"
event_type = events.find(pattern)
new_event = SubElement(event_type, 'event')
event_start = SubElement(new_event, 'event_start')
event_start.text = str(evt['start'])
event_end = SubElement(new_event, 'event_end')
event_end.text = str(evt['end'])
one_chan = chan
if chan is None:
one_chan = evt['chan']
if isinstance(one_chan, (tuple, list)):
one_chan = ', '.join(one_chan)
event_chan = SubElement(new_event, 'event_chan')
event_chan.text = one_chan
event_qual = SubElement(new_event, 'event_qual')
event_qual.text = 'Good'
if parent is not None:
progress.setValue(i)
if progress.wasCanceled():
return
self.save()
if parent is not None:
progress.close()
[docs] def remove_event(self, name=None, time=None, chan=None):
"""get events inside window."""
events = self.rater.find('events')
if name is not None:
pattern = "event_type[@type='" + name + "']"
else:
pattern = "event_type"
if chan is not None:
if isinstance(chan, (tuple, list)):
chan = ', '.join(chan)
for e_type in list(events.iterfind(pattern)):
for e in e_type:
event_start = float(e.find('event_start').text)
event_end = float(e.find('event_end').text)
event_chan = e.find('event_chan').text
if time is None:
time_cond = True
else:
time_cond = allclose(time[0], event_start) and allclose(
time[1], event_end)
if chan is None:
chan_cond = True
else:
chan_cond = event_chan == chan
if time_cond and chan_cond:
e_type.remove(e)
self.save()
[docs] def get_events(self, name=None, time=None, chan=None, stage=None,
qual=None, cycle=None):
"""Get list of events in the file.
Parameters
----------
name : str, optional
name of the event of interest
time : tuple of two float, optional
start and end time of the period of interest
chan : tuple of str, optional
list of channels of interests
stage : tuple of str, optional
list of stages of interest
qual : str, optional
epoch signal qualifier (Good or Poor)
cycle : list of int, optional
list of cycles of interest, numbered starting at 1
Returns
-------
list of dict
where each dict has 'name' (name of the event), 'start' (start
time), 'end' (end time), 'chan' (channels of interest, can be
empty), 'stage', 'quality' (signal quality), 'cycle' (sleep period)
Raises
------
IndexError
When there is no rater / epochs at all
"""
# get events inside window
events = self.rater.find('events')
if name is not None:
pattern = "event_type[@type='" + name + "']"
else:
pattern = "event_type"
if chan is not None:
if isinstance(chan, (tuple, list)):
if chan[0] is not None:
chan = ', '.join(chan)
else:
chan = None
if stage or qual:
ep_starts = [x['start'] for x in self.epochs]
if stage:
ep_stages = [x['stage'] for x in self.epochs]
if qual:
ep_quality = [x['quality'] for x in self.epochs]
if cycle:
cycles = self.get_cycles()
ev = []
for e_type in events.iterfind(pattern):
event_name = e_type.get('type')
for e in e_type:
event_start = float(e.find('event_start').text)
event_end = float(e.find('event_end').text)
event_chan = e.find('event_chan').text
event_qual = e.find('event_qual').text
if event_chan is None: # xml doesn't store empty string
event_chan = ''
if stage or qual:
pos = bisect_left(ep_starts, event_start)
if pos == len(ep_starts):
pos -= 1
elif event_start != ep_starts[pos]:
pos -= 1
if stage is None:
stage_cond = True
else:
ev_stage = ep_stages[pos]
stage_cond = ev_stage in stage
if qual is None:
qual_cond = True
else:
ev_qual = ep_quality[pos]
qual_cond = ev_qual == qual
if cycle is None:
cycle_cond = True
else:
ev_cycle = None
for cyc in cycles:
cyc_start, cyc_end, cyc_number = cyc
if cyc_start <= event_start < cyc_end:
ev_cycle = cyc_number
break
cycle_cond = ev_cycle in cycle
if time is None:
time_cond = True
else:
time_cond = time[0] <= event_end and time[1] >= event_start
if chan is None:
chan_cond = True
else:
chan_cond = event_chan == chan
if (time_cond and chan_cond and stage_cond and qual_cond and
cycle_cond):
one_ev = {'name': event_name,
'start': event_start,
'end': event_end,
'chan': event_chan.split(', '), # always a list
'stage': '',
'quality': event_qual,
'cycle': '',
}
if stage is not None:
one_ev['stage'] = ev_stage
if cycle is not None:
one_ev['cycle'] = ev_cycle
ev.append(one_ev)
return ev
[docs] def create_epochs(self, epoch_length=30, first_second=None):
"""Create epochs in annotation file.
Parameters
----------
epoch_length : int
duration in seconds of each epoch
first_second : int, optional
Time, in seconds from record start, at which the epochs begin
"""
lg.info('creating epochs of length ' + str(epoch_length))
if first_second is None:
first_second = self.first_second
last_sec = ceil((self.last_second - first_second) /
epoch_length) * epoch_length
stages = self.rater.find('stages')
for epoch_beg in range(first_second, last_sec, epoch_length):
epoch = SubElement(stages, 'epoch')
start_time = SubElement(epoch, 'epoch_start')
start_time.text = str(epoch_beg)
end_time = SubElement(epoch, 'epoch_end')
end_time.text = str(epoch_beg + epoch_length)
stage = SubElement(epoch, 'stage')
stage.text = 'Unknown'
quality = SubElement(epoch, 'quality')
quality.text = 'Good'
@property
def epochs(self):
"""Get epochs as generator
Returns
-------
list of dict
each epoch is defined by start_time and end_time (in s in reference
to the start of the recordings) and a string of the sleep stage,
and a string of the signal quality.
If you specify stages_of_interest, only epochs belonging to those
stages will be included (can be an empty list).
Raises
------
IndexError
When there is no rater / epochs at all
"""
if self.rater is None:
raise IndexError('You need to have at least one rater')
for one_epoch in self.rater.iterfind('stages/epoch'):
epoch = {'start': int(one_epoch.find('epoch_start').text),
'end': int(one_epoch.find('epoch_end').text),
'stage': one_epoch.find('stage').text,
'quality': one_epoch.find('quality').text
}
yield epoch
[docs] def get_epochs(self, time=None, stage=None, qual=None,
chan=None, name=None):
"""Get list of events in the file.
Parameters
----------
time : tuple of two float, optional
start and end time of the period of interest
stage : tuple of str, optional
list of stages of interest
qual : str, optional
epoch signal qualifier (Good or Poor)
chan : None
placeholder, to maintain format similar to get_events
name : None
placeholder, to maintain format similar to get_events
Returns
-------
list of dict
where each dict has 'start' (start time), 'end' (end time),
'stage', 'qual' (signal quality)
"""
time_cond = True
stage_cond = True
qual_cond = True
valid = []
for ep in self.epochs:
if stage:
stage_cond = ep['stage'] in stage
if qual:
qual_cond = ep['quality'] == qual
if time:
time_cond = time[0] <= ep['start'] and time[1] >= ep['end']
if stage_cond and qual_cond and time_cond:
valid.append(ep)
return valid
[docs] def get_epoch_start(self, window_start):
""" Get the position (seconds) of the nearest epoch.
Parameters
----------
window_start : float
Position of the current window (seconds)
Returns
-------
float
Position (seconds) of the nearest epoch.
"""
epoch_starts = [x['start'] for x in self.epochs]
idx = asarray([abs(window_start - x) for x in epoch_starts]).argmin()
return epoch_starts[idx]
[docs] def get_stage_for_epoch(self, epoch_start, window_length=None,
attr='stage'):
"""Return stage for one specific epoch.
Parameters
----------
id_epoch : str
index of the epoch
attr : str, optional
'stage' or 'quality'
Returns
-------
stage : str
description of the stage.
"""
for epoch in self.epochs:
if epoch['start'] == epoch_start:
return epoch[attr]
if window_length is not None:
epoch_length = epoch['end'] - epoch['start']
if logical_and(window_length < epoch_length,
0 <= \
(epoch_start - epoch['start']) < epoch_length):
return epoch[attr]
[docs] def time_in_stage(self, name, attr='stage'):
"""Return time (in seconds) in the selected stage.
Parameters
----------
name : str
one of the sleep stages, or qualifiers
attr : str, optional
either 'stage' or 'quality'
Returns
-------
int
time spent in one stage/qualifier, in seconds.
"""
return sum(x['end'] - x['start'] for x in self.epochs
if x[attr] == name)
[docs] def set_stage_for_epoch(self, epoch_start, name, attr='stage', save=True):
"""Change the stage for one specific epoch.
Parameters
----------
epoch_start : int
start time of the epoch, in seconds
name : str
description of the stage or qualifier.
attr : str, optional
either 'stage' or 'quality'
save : bool
whether to save every time one epoch is scored
Raises
------
KeyError
When the epoch_start is not in the list of epochs.
IndexError
When there is no rater / epochs at all
Notes
-----
In the GUI, you want to save as often as possible, even if it slows
down the program, but it's the safer option. But if you're converting
a dataset, you want to save at the end. Do not forget to save!
"""
if self.rater is None:
raise IndexError('You need to have at least one rater')
for one_epoch in self.rater.iterfind('stages/epoch'):
if int(one_epoch.find('epoch_start').text) == epoch_start:
one_epoch.find(attr).text = name
if save:
self.save()
return
raise KeyError('epoch starting at ' + str(epoch_start) + ' not found')
[docs] def set_cycle_mrkr(self, epoch_start, end=False):
"""Mark epoch start as cycle start or end.
Parameters
----------
epoch_start: int
start time of the epoch, in seconds
end : bool
If True, marked as cycle end; otherwise, marks cycle start
"""
if self.rater is None:
raise IndexError('You need to have at least one rater')
bound = 'start'
if end:
bound = 'end'
for one_epoch in self.rater.iterfind('stages/epoch'):
if int(one_epoch.find('epoch_start').text) == epoch_start:
cycles = self.rater.find('cycles')
name = 'cyc_' + bound
new_bound = SubElement(cycles, name)
new_bound.text = str(int(epoch_start))
self.save()
return
raise KeyError('epoch starting at ' + str(epoch_start) + ' not found')
[docs] def remove_cycle_mrkr(self, epoch_start):
"""Remove cycle marker at epoch_start.
Parameters
----------
epoch_start: int
start time of epoch, in seconds
"""
if self.rater is None:
raise IndexError('You need to have at least one rater')
cycles = self.rater.find('cycles')
for one_mrkr in cycles.iterfind('cyc_start'):
if int(one_mrkr.text) == epoch_start:
cycles.remove(one_mrkr)
self.save()
return
for one_mrkr in cycles.iterfind('cyc_end'):
if int(one_mrkr.text) == epoch_start:
cycles.remove(one_mrkr)
self.save()
return
raise KeyError('cycle marker at ' + str(epoch_start) + ' not found')
[docs] def clear_cycles(self):
"""Remove all cycle markers in current rater."""
if self.rater is None:
raise IndexError('You need to have at least one rater')
cycles = self.rater.find('cycles')
for cyc in list(cycles):
cycles.remove(cyc)
self.save()
[docs] def get_cycles(self):
"""Return the cycle start and end times.
Returns
-------
list of tuple of float
start and end times for each cycle, in seconds from recording start
and the cycle index starting at 1
"""
cycles = self.rater.find('cycles')
if not cycles:
return None
starts = sorted(
[float(mrkr.text) for mrkr in cycles.findall('cyc_start')])
ends = sorted(
[float(mrkr.text) for mrkr in cycles.findall('cyc_end')])
cyc_list = []
if not starts or not ends:
return None
if all(i < starts[0] for i in ends):
raise ValueError('First cycle has no start.')
for (this_start, next_start) in zip(starts, starts[1:] + [inf]):
# if an end is smaller than the next start, make it the end
# otherwise, the next_start is the end
end_between_starts = [end for end in ends \
if this_start < end <= next_start]
if len(end_between_starts) > 1:
raise ValueError('Found more than one cycle end for same '
'cycle')
if end_between_starts:
one_cycle = (this_start, end_between_starts[0])
else:
one_cycle = (this_start, next_start)
if one_cycle[1] == inf:
raise ValueError('Last cycle has no end.')
cyc_list.append(one_cycle)
output = []
for i, j in enumerate(cyc_list):
cyc = j[0], j[1], i + 1
output.append(cyc)
return output
[docs] def switch(self, time=None):
"""Obtain switch parameter, ie number of times the stage shifts."""
stag_to_int = {'NREM1': 1, 'NREM2': 2, 'NREM3': 3, 'REM': 5, 'Wake': 0}
hypno = [stag_to_int[x['stage']] for x in self.get_epochs(time=time) \
if x['stage'] in stag_to_int.keys()]
return sum(asarray(diff(hypno), dtype=bool))
[docs] def slp_frag(self, time=None):
"""Obtain sleep fragmentation parameter, ie number of stage shifts to
a lighter stage."""
epochs = self.get_epochs(time=time)
stage_int = {'Wake': 0, 'NREM1': 1, 'NREM2': 2, 'NREM3': 3, 'REM': 2}
hypno_str = [x['stage'] for x in epochs \
if x['stage'] in stage_int.keys()]
hypno_int = [stage_int[x] for x in hypno_str]
frag = sum(asarray(clip(diff(hypno_int), a_min=None, a_max=0),
dtype=bool))
# N3 to REM doesn't count
n3_to_rem = 0
for i, j in enumerate(hypno_str[:-1]):
if j == 'NREM3':
if hypno_str[i + 1] == 'REM':
n3_to_rem += 1
return frag - n3_to_rem
[docs] def latency_to_consolidated(self, lights_off, duration=5,
stage=['NREM2', 'NREM3']):
"""Find latency to the first period of uninterrupted 'stage'.
Parameters
----------
lights_off : float
lights off time, in seconds form recording start
duration : float
duration of uninterrupted period, in minutes
stage : list of str
target stage(s)
Returns
-------
float
latency to the start of the consolidated period, in minutes
"""
epochs = self.get_epochs()
if len(stage) > 1:
for ep in epochs:
if ep['stage'] in stage:
ep['stage'] = 'target'
stage = ['target']
hypno = [x['stage'] for x in epochs]
groups = groupby(hypno)
runs = [(stag, sum(1 for _ in group)) for stag, group in groups]
idx_start = 0
for one_stage, n in runs:
if (one_stage in stage) and n >= duration * 60 / self.epoch_length:
break
idx_start += n
if idx_start < len(hypno):
latency = (epochs[idx_start]['start'] - lights_off) / 60
else:
latency = nan
return latency
[docs] def export(self, file_to_export, xformat='csv'):
"""Export epochwise annotations to csv file.
Parameters
----------
file_to_export : path to file
file to write to
"""
if 'csv' == xformat:
with open(file_to_export, 'w', newline='') as f:
csv_file = writer(f)
csv_file.writerow(['Wonambi v{}'.format(__version__)])
csv_file.writerow(('clock start time', 'start', 'end',
'stage'))
for epoch in self.epochs:
epoch_time = (self.start_time +
timedelta(seconds=epoch['start']))
csv_file.writerow((epoch_time.strftime('%H:%M:%S'),
epoch['start'],
epoch['end'],
epoch['stage']))
if 'remlogic' in xformat:
columns = 'Time [hh:mm:ss]\tEvent\tDuration[s]\n'
if 'remlogic_fr' == xformat:
columns = 'Heure [hh:mm:ss]\tEvénement\tDurée[s]\n'
patient_id = splitext(basename(self.dataset))[0]
rec_date = self.start_time.strftime('%d/%m/%Y')
stkey = {v:k for k, v in REMLOGIC_STAGE_KEY.items()}
stkey['Artefact'] = 'SLEEP-UNSCORED'
stkey['Unknown'] = 'SLEEP-UNSCORED'
stkey['Movement'] = 'SLEEP-UNSCORED'
with open(file_to_export, 'w') as f:
f.write('RemLogic Event Export\n')
f.write('Patient:\t' + patient_id + '\n')
f.write('Patient ID:\t' + patient_id + '\n')
f.write('Recording Date:\t' + rec_date + '\n')
f.write('\n')
f.write('Events Included:\n')
for i in sorted(set([stkey[x['stage']] for x in self.epochs])):
f.write(i + '\n')
f.write('\n')
f.write(columns)
for epoch in self.epochs:
epoch_time = (self.start_time +
timedelta(seconds=epoch['start']))
f.write((epoch_time.strftime('%Y-%m-%dT%H:%M:%S.000000') +
'\t' +
stkey[epoch['stage']] +
'\t' +
str(self.epoch_length) +
'\n'))
[docs] def export_sleep_stats(self, filename, lights_off, lights_on):
"""Create CSV with sleep statistics.
Parameters
----------
filename: str
Filename for csv export
lights_off: float
Initial time when sleeper turns off the light (or their phone) to
go to sleep, in seconds from recording start
lights_on: float
Final time when sleeper rises from bed after sleep, in seconds from
recording start
Returns
-------
float or None
If there are no epochs scored as sleep, returns None. Otherwise,
returns the sleep onset latency, for testing purposes.
Note
----
Total dark time and sleep efficiency does NOT subtract epochs marked as
Undefined or Unknown.
"""
epochs = self.get_epochs()
ep_starts = [i['start'] for i in epochs]
hypno = [i['stage'] for i in epochs]
n_ep_per_min = 60 / self.epoch_length
first = {}
latency = {}
for stage in ['NREM1', 'NREM2', 'NREM3', 'REM']:
first[stage] = next(((i, j) for i, j in enumerate(epochs) if \
j['stage'] == stage), None)
if first[stage] is not None:
latency[stage] = (first[stage][1]['start'] -
lights_off) / 60
else:
first[stage] = nan
latency[stage] = nan
idx_loff = asarray([abs(x - lights_off) for x in ep_starts]).argmin()
idx_lon = asarray([abs(x - lights_on) for x in ep_starts]).argmin()
duration = {}
for stage in ['NREM1', 'NREM2', 'NREM3', 'REM', 'Wake', 'Movement',
'Artefact']:
duration[stage] = hypno[idx_loff:idx_lon].count(
stage) / n_ep_per_min
slp_onset = sorted(first.values(),
key=lambda x: x[1]['start'] if x is not nan else inf)[0]
wake_up = next((len(epochs) - i, j) for i, j in enumerate(
epochs[::-1]) if j['stage'] in ['NREM1', 'NREM2', 'NREM3',
'REM'])
total_dark_time = (lights_on - lights_off) / 60
#slp_period_time = (wake_up[1]['start'] - slp_onset[1]['start']) / 60
slp_onset_lat = (slp_onset[1]['start'] - lights_off) / 60
waso = hypno[slp_onset[0]:wake_up[0]].count('Wake') / n_ep_per_min
wake_mor = (lights_on - wake_up[1]['start']) / 60
#APwake = waso + slp_onset_lat
waso_total = sum((waso, wake_mor))
total_slp_period = sum((waso, duration['NREM1'], duration['NREM2'],
duration['NREM3'], duration['REM']))
total_slp_time = total_slp_period - waso
slp_eff = total_slp_time / total_dark_time
switch = self.switch()
slp_frag = self.slp_frag()
dt_format = '%d/%m/%Y %H:%M:%S'
loff_str = (self.start_time + timedelta(seconds=lights_off)).strftime(
dt_format)
lon_str = (self.start_time + timedelta(seconds=lights_on)).strftime(
dt_format)
slp_onset_str = (self.start_time + timedelta(
seconds=slp_onset[1]['start'])).strftime(dt_format)
wake_up_str = (self.start_time + timedelta(
seconds=wake_up[1]['start'])).strftime(dt_format)
slcnrem5 = self.latency_to_consolidated(lights_off, duration=5,
stage=['NREM2', 'NREM3'])
slcnrem10 = self.latency_to_consolidated(lights_off, duration=10,
stage=['NREM2', 'NREM3'])
slcn35 = self.latency_to_consolidated(lights_off, duration=5,
stage=['NREM3'])
slcn310 = self.latency_to_consolidated(lights_off, duration=10,
stage=['NREM3'])
cycles = self.get_cycles() if self.get_cycles() else []
cyc_stats = []
for i, cyc in enumerate(cycles):
one_cyc = {}
cyc_hypno = [x['stage'] for x in self.get_epochs(time=cyc)]
one_cyc['duration'] = {}
for stage in ['NREM1', 'NREM2', 'NREM3', 'REM', 'Wake', 'Movement',
'Artefact']:
one_cyc['duration'][stage] = cyc_hypno.count(stage) # in epochs
one_cyc['tst'] = sum([one_cyc['duration'][stage] for stage in [
'NREM1', 'NREM2', 'NREM3', 'REM']])
one_cyc['tsp'] = one_cyc['tst'] + one_cyc['duration']['Wake']
one_cyc['slp_eff'] = one_cyc['tst'] / one_cyc['tsp']
one_cyc['switch'] = self.switch(time=cyc)
one_cyc['slp_frag'] = self.slp_frag(time=cyc)
cyc_stats.append(one_cyc)
with open(filename, 'w', newline='') as f:
lg.info('Writing to ' + str(filename))
cf = writer(f)
cf.writerow(['Wonambi v{}'.format(__version__)])
cf.writerow(['Variable', 'Acronym',
'Unit 1', 'Value 1',
'Unit 2', 'Value 2',
'Formula'])
cf.writerow(['Lights off', 'LOFF',
'dd/mm/yyyy HH:MM:SS', loff_str,
'seconds from recording start', lights_off,
'marker'])
cf.writerow(['Lights on', 'LON',
'dd/mm/yyyy HH:MM:SS', lon_str,
'seconds from recording start', lights_on,
'marker'])
cf.writerow(['Sleep onset', 'SO',
'dd/mm/yyyy HH:MM:SS', slp_onset_str,
'seconds from recording start', slp_onset[1]['start'],
'first sleep epoch (N1 or N2) - LOFF'])
cf.writerow(['Time of last awakening', '',
'dd/mm/yyyy HH:MM:SS', wake_up_str,
'seconds from recording start', wake_up[1]['start'],
'end time of last epoch of N1, N2, N3 or REM'])
cf.writerow(['Total dark time (Time in bed)', 'TDT (TIB)',
'Epochs', total_dark_time * n_ep_per_min,
'Minutes', total_dark_time,
'LON - LOFF'])
cf.writerow(['Wake duration', 'W',
'Epochs', duration['Wake'] * n_ep_per_min,
'Minutes', duration['Wake'],
'total Wake duration between LOFF and LON'])
cf.writerow(['Sleep latency', 'SL',
'Epochs', slp_onset_lat * n_ep_per_min,
'Minutes', slp_onset_lat,
'LON - SO'])
cf.writerow(['Wake after sleep onset', 'WASO',
'Epochs', waso * n_ep_per_min,
'Minutes', waso,
'W - SL'])
cf.writerow(['Wake, morning', 'Wmor',
'Epochs', wake_mor * n_ep_per_min,
'Minutes', wake_mor,
'total W-SL-WASO'])
cf.writerow(['WASO total', 'WASOT',
'Epochs', waso_total * n_ep_per_min,
'Minutes', waso_total,
'WASO + Wmor'])
cf.writerow(['N1 duration', '',
'Epochs', duration['NREM1'] * n_ep_per_min,
'Minutes', duration['NREM1'],
'total N1 duration between LOFF and LON'])
cf.writerow(['N2 duration', '',
'Epochs', duration['NREM2'] * n_ep_per_min,
'Minutes', duration['NREM2'],
'total N2 duration between LOFF and LON'])
cf.writerow(['N3 duration', '',
'Epochs', duration['NREM3'] * n_ep_per_min,
'Minutes', duration['NREM3'],
'total N3 duration between LOFF and LON'])
cf.writerow(['REM duration', '',
'Epochs', duration['REM'] * n_ep_per_min,
'Minutes', duration['REM'],
'total REM duration between LOFF and LON'])
cf.writerow(['Artefact duration', '',
'Epochs',
duration['Artefact'] * n_ep_per_min,
'Minutes', duration['Artefact'],
'total Artefact duration between LOFF and LON'])
cf.writerow(['Movement duration', '',
'Epochs',
duration['Movement'] * n_ep_per_min,
'Minutes', duration['Movement'],
'total Movement duration between LOFF and LON'])
cf.writerow(['Total sleep period', 'TSP',
'Epochs', total_slp_period * n_ep_per_min,
'Minutes', total_slp_period,
'WASO + N1 + N2 + N3 + REM'])
cf.writerow(['Total sleep time', 'TST',
'Epochs', total_slp_time * n_ep_per_min,
'Minutes', total_slp_time,
'N1 + N2 + N3 + REM'])
cf.writerow(['Sleep efficiency', 'SE',
'%', slp_eff * 100,
'', '',
'TST / TDT * 100'])
cf.writerow(['W % TiB', '',
'%', duration['Wake'] * 100 / total_dark_time,
'', '',
'W / TDT * 100'])
cf.writerow(['N1 % TiB', '',
'%', duration['NREM1'] * 100 / total_dark_time,
'', '',
'N1 / TDT * 100'])
cf.writerow(['N2 % TiB', '',
'%', duration['NREM2'] * 100 / total_dark_time,
'', '',
'N2 / TDT * 100'])
cf.writerow(['N3 % TiB', '',
'%', duration['NREM3'] * 100 / total_dark_time,
'', '',
'N3 / TDT * 100'])
cf.writerow(['REM % TiB', '',
'%', duration['REM'] * 100 / total_dark_time,
'', '',
'REM / TDT * 100'])
cf.writerow(['W % TSP', '',
'%', waso * 100 / total_slp_period,
'', '',
'WASO / TSP * 100'])
cf.writerow(['N1 % TSP', '',
'%', duration['NREM1'] * 100 / total_slp_period,
'', '',
'N1 / TSP * 100'])
cf.writerow(['N2 % TSP', '',
'%', duration['NREM2'] * 100 / total_slp_period,
'', '',
'N2 / TSP * 100'])
cf.writerow(['N3 % TSP', '',
'%', duration['NREM3'] * 100 / total_slp_period,
'', '',
'N3 / TSP * 100'])
cf.writerow(['REM % TSP', '',
'%', duration['REM'] * 100 / total_slp_period,
'', '',
'REM / TSP * 100'])
cf.writerow(['N1 % TST', '',
'%', duration['NREM1'] * 100 / total_slp_time,
'', '',
'N1 / TST * 100'])
cf.writerow(['N2 % TST', '',
'%', duration['NREM2'] * 100 / total_slp_time,
'', '',
'N2 / TST * 100'])
cf.writerow(['N3 % TST', '',
'%', duration['NREM3'] * 100 / total_slp_time,
'', '',
'N3 / TST * 100'])
cf.writerow(['REM % TST', '',
'%', duration['REM'] * 100 / total_slp_time,
'', '',
'REM / TST * 100'])
cf.writerow(['Switch', '',
'N', switch,
'', '',
'number of stage shifts'])
cf.writerow(['Switch index', '',
'N per epoch',
switch / total_slp_period / n_ep_per_min,
'N per minute', switch / total_slp_period,
'switch / TSP'])
cf.writerow(['Switch index H', '',
'N per hour',
switch / (total_slp_period / 60),
'','',
'switch / (TSP / 60)'])
cf.writerow(['Sleep fragmentation', '',
'N', slp_frag,
'', '',
('number of shifts to a lighter stage '
'(W > N1 > N2 > N3; W > N1 > REM)')])
cf.writerow(['Sleep fragmentation index', 'SFI',
'N per epoch',
slp_frag / total_slp_time / n_ep_per_min,
'N per minute', slp_frag / total_slp_time,
'sleep fragmentation / TST'])
cf.writerow(['Sleep fragmentation index H', 'SFI H',
'N per hour',
slp_frag / (total_slp_time / 60),
'', '',
'sleep fragmentation / TST / 60'])
cf.writerow(['Sleep latency to N1', 'SLN1',
'Epochs', latency['NREM1'] * n_ep_per_min,
'Minutes', latency['NREM1'],
'first N1 epoch - LOFF'])
cf.writerow(['Sleep latency to N2', 'SLN2',
'Epochs', latency['NREM2'] * n_ep_per_min,
'Minutes', latency['NREM2'],
'first N2 epoch - LOFF'])
cf.writerow(['Sleep latency to N3', 'SLN3',
'Epochs', latency['NREM3'] * n_ep_per_min,
'Minutes', latency['NREM3'],
'first N3 epoch - LOFF'])
cf.writerow(['Sleep latency to REM', 'SLREM',
'Epochs', latency['REM'] * n_ep_per_min,
'Minutes', latency['REM'],
'first REM epoch - LOFF'])
cf.writerow(['Sleep latency to consolidated NREM, 5 min',
'SLCNREM5',
'Epochs', slcnrem5 * n_ep_per_min,
'Minutes', slcnrem5,
('start of first uninterrupted 5-minute period of '
'N2 and/or N3 - LOFF')])
cf.writerow(['Sleep latency to consolidated NREM, 10 min',
'SLCNREM10',
'Epochs', slcnrem10 * n_ep_per_min,
'Minutes', slcnrem10,
('start of first uninterrupted 10-minute period of '
'N2 and/or N3 - LOFF')])
cf.writerow(['Sleep latency to consolidated N3, 5 min', 'SLCN35',
'Epochs', slcn35 * n_ep_per_min,
'Minutes', slcn35,
('start of first uninterrupted 5-minute period of '
'N3 - LOFF')])
cf.writerow(['Sleep latency to consolidated N3, 10 min', 'SLCN310',
'Epochs', slcn310 * n_ep_per_min,
'Minutes', slcn310,
('start of first uninterrupted 10-minute period of '
'N3 - LOFF')])
for i in range(len(cycles)):
one_cyc = cyc_stats[i]
cf.writerow([''])
cf.writerow([f'Cycle {i + 1}'])
cf.writerow(['Cycle % duration', '',
'%', (one_cyc['tsp'] * 100 /
total_slp_period / n_ep_per_min),
'', '',
'cycle TSP / night TSP'])
for stage in ['Wake', 'NREM1', 'NREM2', 'NREM3', 'REM',
'Artefact', 'Movement']:
cf.writerow([f'{stage} (c{i + 1})', '',
'Epochs', one_cyc['duration'][stage],
'Minutes',
one_cyc['duration'][stage] / n_ep_per_min,
f'total {stage} duration in cycle {i + 1}'])
cf.writerow([f'Total sleep period (c{i + 1})',
f'TSP (c{i + 1})',
'Epochs', one_cyc['tsp'],
'Minutes', one_cyc['tsp'] / n_ep_per_min,
f'Wake + N1 + N2 + N3 + REM in cycle {i + 1}'])
cf.writerow([f'Total sleep time (c{i + 1})', f'TST (c{i + 1})',
'Epochs', one_cyc['tst'],
'Minutes', one_cyc['tst'] / n_ep_per_min,
f'N1 + N2 + N3 + REM in cycle {i + 1}'])
cf.writerow([f'Sleep efficiency (c{i + 1})', f'SE (c{i + 1})',
'%', one_cyc['slp_eff'] * 100,
'', '',
f'TST / TSP in cycle {i + 1}'])
for denom in ['TSP', 'TST']:
for stage in ['Wake', 'NREM1', 'NREM2', 'NREM3', 'REM']:
cf.writerow([f'{stage} % {denom} (c{i + 1})', '',
'%', (one_cyc['duration'][stage] /
one_cyc[denom.lower()]) * 100,
'', '',
f'{stage} / {denom} in cycle {i + 1}'])
cf.writerow([f'Switch (c{i + 1})', '',
'N', one_cyc['switch'], '', '',
f'number of stage shifts in cycle {i + 1}'])
cf.writerow([f'Switch % (c{i + 1})', '',
'% epochs', (one_cyc['switch'] /
one_cyc['tsp']),
'% minutes', (one_cyc['switch'] *
n_ep_per_min / one_cyc['tsp']),
f'switch / TSP in cycle {i + 1}'])
cf.writerow([f'Sleep fragmentation (c{i + 1})', '',
'N', one_cyc['slp_frag'], '', '',
'number of shifts to a lighter stage in cycle '
f'{i + 1}'])
cf.writerow([f'Sleep fragmentation index (c{i + 1})',
f'SFI (c{i + 1})',
'% epochs', (one_cyc['slp_frag'] /
one_cyc['tsp']),
'% minutes', (one_cyc['slp_frag'] *
n_ep_per_min / one_cyc['tsp']),
f'sleep fragmentation / TSP in cycle {i + 1}'])
return slp_onset_lat, waso, total_slp_time # for testing
[docs] def export_events(self, filename, evt_type=None, chan=None, stage=None,
cycle=None):
"""Export events to CSV
Parameters
----------
filename : str
path of export file
evt_type : list of str, optional
event types to export
chan : tuple of str, optional
list of channels of interests
stage : tuple of str, optional
list of stages of interest
cycle : list of int, optional
list of cycles of interest, numbered starting at 1
"""
filename = splitext(filename)[0] + '.csv'
headings_row = ['Index',
'Start time',
'End time',
'Stitches',
'Stage',
'Cycle',
'Event type',
'Channel']
events = []
if evt_type is None:
evt_type = self.event_types
for et in evt_type:
events.extend(self.get_events(name=et, chan=chan, stage=stage,
cycle=cycle))
events = sorted(events, key=lambda evt: evt['start'])
if events is None:
lg.info('No events found.')
return
with open(filename, 'w', newline='') as f:
lg.info('Writing to ' + str(filename))
csv_file = writer(f)
csv_file.writerow(['Wonambi v{}'.format(__version__)])
csv_file.writerow(headings_row)
for i, ev in enumerate(events):
csv_file.writerow([i + 1,
ev['start'],
ev['end'],
0,
ev['stage'],
ev['cycle'],
ev['name'],
', '.join(ev['chan']),
])
[docs] def import_events(self, filename, source='wonambi', rec_start=None,
chan_dict=None, chan_grp_name='eeg', parent=None):
"""Import events from Wonambi CSV event export and write to annot.
Parameters
----------
filename : str
path to file
source : str
source program: 'wonambi' or 'remlogic'
rec_start : datetime
Date and time (year, month, day, hour, minute, second) of recording
start. Year is ignored (New Year's Eve celebratory recordings
unsupported.) Only required for remlogic.
chan_dict : dict
for prana. keys are channels as they appear in the prana input
file, and values are channels as they appear in the Wonambi GUI
(with reference and channel group).
chan_grp_name : str
for prana. name of the channel group in which to store events.
parent : QWidget
for GUI progress bar
"""
events = []
if 'wonambi' == source:
with open(filename, 'r', encoding='utf-8') as csvfile:
csv_reader = reader(csvfile, delimiter=',')
for row in csv_reader:
try:
int(row[0])
one_ev = {'name': row[6],
'start': float(row[1]),
'end': float(row[2]),
'chan': row[7].split(', '), # always a list
'stage': row[4],
'quality': 'Good'
}
events.append(one_ev)
except ValueError:
continue
elif 'remlogic' == source:
with open(filename, 'r', encoding='ISO-8859-1') as f:
lines = f.readlines()
time_hdrs = ('Time [hh:mm:ss', 'Heure [hh:mm:ss')
idx_header = lines.index(next(
l for l in lines if any(hdr in l for hdr in time_hdrs)))
header = lines[idx_header].split('\t')
header = [s.strip() for s in header] # remove trailing newline
idx_time = [i for i, s in enumerate(header) if any(
x in s for x in time_hdrs)][0]
idx_evt = [i for i, s in enumerate(header) if any(
x in s for x in (
'Event', 'vènement', 'vénement', 'venement') )][0]
idx_dur = [i for i, s in enumerate(header) if any(
x in s for x in ('Duration', 'Durée') )][0]
# French files are in 24-hour time
hour24 = 'Heure' in header[idx_time]
# Find staging start date
date_line = lines[3].strip()
stage_start_date = _try_parse_datetime(
date_line[date_line.index(':') + 2:],
('%Y/%m/%d', '%d/%m/%Y', '%Y.%m.%d', '%d.%m.%Y'))
# Events loop
for l in lines[idx_header + 1:]:
cells = l.split('\t')
one_evttype = cells[idx_evt]
# skip epoch staging
if 'SLEEP-' in one_evttype:
continue
clock_start = _remlogic_time(cells[idx_time],
stage_start_date,
hour24=hour24)
start = float((clock_start - rec_start).total_seconds())
one_ev = {'name': one_evttype,
'start': start,
'end': start + float(cells[idx_dur]),
'chan': '',
'stage': '',
'quality': 'Good'
}
events.append(one_ev)
elif 'prana' == source:
with open(filename, 'r', encoding='ISO-8859-1') as f:
lines = f.readlines()
header = lines[0].split('\t')
header = [s.strip() for s in header] # remove trailing newline
idx_time = header.index('Start')
idx_evt = header.index('Type')
idx_dur = header.index('Duration')
idx_chan = header.index('Channel')
stage_start_date = self.start_time.date()
for l in lines[1:]:
cells = l.split('\t')
one_evttype = cells[idx_evt]
clock_start = _prana_time(cells[idx_time],
stage_start_date)
start = float((clock_start - rec_start).total_seconds())
chan_label_prana = cells[idx_chan].strip()
if chan_label_prana == 'All channels':
chan = ''
elif chan_dict:
chan = chan_dict[chan_label_prana]
else: # ignores reference
active_chan = chan_label_prana[4:6]
chan = f'{active_chan} ({chan_grp_name})'
one_ev = {'name': one_evttype,
'start': start,
'end': start + float(cells[idx_dur]),
'chan': chan,
'stage': '',
'quality': 'Good'
}
events.append(one_ev)
else:
raise ValueError('Unknown source program for events file')
self.add_events(events, parent=parent)
[docs] def to_bids(self, tsv_file=None, json_file=None):
if tsv_file is None:
tsv_file = (splitext(basename(self.xml_file))[0] +
'_annotations.tsv')
if json_file is None:
json_file = (splitext(basename(self.xml_file))[0] +
'_annotations.json')
header = {
'Description': ("Annotations as marked by visual and/or "
"automatic inspection of the data using "
"Wonambi open source software."),
'IntendedFor': self.dataset,
'Sources': self.dataset,
'Author': self.current_rater,
'LabelDescription': {'sleep_wake': 'Wakefulness',
'sleep_N1': 'Sleep stage N1',
'sleep_N2': 'Sleep stage N2',
'sleep_N3': 'Sleep stage N3',
'sleep_REM': 'Sleep stage REM',
'artifact': 'Artifact (unspecified)',
'artifact_motion': ('Artifact due to '
'movement'),
'cycle_start': 'Sleep cycle start',
'cycle_end': 'Sleep cycle end',
},
'RecordingStartTime': _abs_time_str(0, self.start_time),
'EpochDuration': int(self.epoch_length),
}
epochs = self.get_epochs()
events = self.get_events()
cycles = self.rater.find('cycles')
abst = self.start_time
starts = sorted(
[float(mrkr.text) for mrkr in cycles.findall('cyc_start')])
ends = sorted(
[float(mrkr.text) for mrkr in cycles.findall('cyc_end')])
with open(json_file, 'w') as f:
dump(header, f, indent=' ')
with open(tsv_file, 'w') as f:
f.write('onset\tduration\tlabel\tchannels\tabsolute_time\t'
'quality\n')
for e in epochs:
f.write('\t'.join([str(e['start']), str(e['end'] - e['start']),
BIDS_STAGE_KEY[e['stage']], 'n/a',
_abs_time_str(e['start'], abst),
e['quality']]) + '\n')
for e in events:
f.write('\t'.join([str(e['start']), str(e['end'] - e['start']),
e['name'], str(e['chan']),
_abs_time_str(e['start'], abst),
e['quality']]) + '\n')
if cycles is not None:
starts = sorted(
[mrkr.text for mrkr in cycles.findall('cyc_start')])
ends = sorted(
[mrkr.text for mrkr in cycles.findall('cyc_end')])
for s in starts:
f.write('\t'.join([s, '0', 'cycle_start', 'n/a',
_abs_time_str(s, abst), 'n/a']) + '\n')
for e in ends:
f.write('\t'.join([e, '0', 'cycle_end', 'n/a',
_abs_time_str(e, abst), 'n/a']) + '\n')
[docs]def update_annotation_version(xml_file):
"""Update the fields that have changed over different versions.
Parameters
----------
xml_file : path to file
xml file with the sleep scoring
Notes
-----
new in version 4: use 'marker_name' instead of simply 'name' etc
new in version 5: use 'bookmark' instead of 'marker'
"""
with open(xml_file, 'r') as f:
s = f.read()
m = search('<annotations version="([0-9]*)">', s)
current = int(m.groups()[0])
if current < 4:
s = sub('<marker><name>(.*?)</name><time>(.*?)</time></marker>',
'<marker><marker_name>\g<1></marker_name><marker_start>\g<2></marker_start><marker_end>\g<2></marker_end><marker_chan/></marker>',
s)
if current < 5:
s = s.replace('marker', 'bookmark')
# note indentation
s = sub('<annotations version="[0-9]*">',
'<annotations version="5">', s)
with open(xml_file, 'w') as f:
f.write(s)
def _abs_time_str(delay, abs_start, time_str='%Y-%m-%dT%H:%M:%S'):
return (abs_start + timedelta(seconds=float(delay))).strftime(time_str)
def _try_parse_datetime(text, fmts):
for fmt in fmts:
try:
return datetime.strptime(text, fmt)
except ValueError:
pass
raise ValueError('No valid date found.')
def _remlogic_time(time_cell, date, hour24):
"""Reads RemLogic time string to datetime
Parameters
----------
time_cell : str
entire time cell from text file
date : datetime
start date from text file
hour24 : bool
if True, time will be read on 24-hour clock instead of 12-hour clock
Returns
-------
datetime
date and time
"""
time_str = time_cell[time_cell.index(':') - 2:]
if hour24:
stage_start_time = _try_parse_datetime(time_str,
('%H:%M:%S', '%H:%M:%S.%f'))
start = datetime.combine(date.date(), stage_start_time.time())
else:
stage_start_time = _try_parse_datetime(time_str,
('%I:%M:%S', '%I:%M:%S.%f'))
start = datetime.combine(date.date(), stage_start_time.time())
if time_cell[1] == 'U':
start = start + timedelta(hours=12)
elif time_cell[-8:-10] == '12':
start = start + timedelta(hours=12)
else:
start = start + timedelta(hours=24)
return start
def _prana_time(time_cell, date):
"""Reads RemLogic time string to datetime
Parameters
----------
time_cell : str
entire time cell from text file
date : date
start date from text file
Returns
-------
datetime
date and time
"""
stage_start_time = datetime.strptime(time_cell[3:], '%H:%M:%S.%f')
start = datetime.combine(date, stage_start_time.time())
if time_cell[1] == '2':
start = start + timedelta(hours=24)
return start