Source code for wonambi.trans.analyze

"""Analysis and export convenience functions.
"""

from logging import getLogger
from itertools import compress
from csv import writer
from numpy import (amax, amin, asarray, concatenate, in1d, mean, negative, ptp, 
                   reshape, sqrt, square)

try:
    from PyQt5.QtCore import Qt
    from PyQt5.QtWidgets import QProgressDialog
except ImportError:
    Qt = None
    QProgressDialog = None

from .. import __version__
from .math import math, get_descriptives
from .frequency import band_power
from .peaks import get_slopes

lg = getLogger(__name__)


[docs]def event_params(segments, params, band=None, n_fft=None, slopes=None, prep=None, parent=None): """Compute event parameters. Parameters ---------- segments : instance of wonambi.trans.select.Segments list of segments, with time series and metadata params : dict of bool, or str 'dur', 'minamp', 'maxamp', 'ptp', 'rms', 'power', 'peakf', 'energy', 'peakef'. If 'all', a dict will be created with these keys and all values as True, so that all parameters are returned. band : tuple of float band of interest for power and energy n_fft : int length of FFT. if shorter than input signal, signal is truncated; if longer, signal is zero-padded to length slopes : dict of bool 'avg_slope', 'max_slope', 'prep', 'invert' prep : dict of bool same keys as params. if True, segment['trans_data'] will be used as dat parent : QMainWindow for use with GUI only Returns ------- list of dict list of segments, with time series, metadata and parameters """ if parent is not None: progress = QProgressDialog('Computing parameters', 'Abort', 0, len(segments) - 1, parent) progress.setWindowModality(Qt.ApplicationModal) param_keys = ['dur', 'minamp', 'maxamp', 'ptp', 'rms', 'power', 'peakpf', 'energy', 'peakef'] if params == 'all': params = {k: 1 for k in param_keys} if prep is None: prep = {k: 0 for k in param_keys} if band is None: band = (None, None) params_out = [] evt_output = False for i, seg in enumerate(segments): out = dict(seg) dat = seg['data'] if params['dur']: out['dur'] = float(dat.number_of('time')) / dat.s_freq evt_output = True if params['minamp']: dat1 = dat if prep['minamp']: dat1 = seg['trans_data'] out['minamp'] = math(dat1, operator=_amin, axis='time') evt_output = True if params['maxamp']: dat1 = dat if prep['maxamp']: dat1 = seg['trans_data'] out['maxamp'] = math(dat1, operator=_amax, axis='time') evt_output = True if params['ptp']: dat1 = dat if prep['ptp']: dat1 = seg['trans_data'] out['ptp'] = math(dat1, operator=_ptp, axis='time') evt_output = True if params['rms']: dat1 = dat if prep['rms']: dat1 = seg['trans_data'] out['rms'] = math(dat1, operator=(square, _mean, sqrt), axis='time') evt_output = True for pw, pk in [('power', 'peakpf'), ('energy', 'peakef')]: if params[pw] or params[pk]: evt_output = True if prep[pw] or prep[pk]: prep_pw, prep_pk = band_power(seg['trans_data'], band, scaling=pw, n_fft=n_fft) if not (prep[pw] and prep[pk]): raw_pw, raw_pk = band_power(dat, band, scaling=pw, n_fft=n_fft) if prep[pw]: out[pw] = prep_pw else: out[pw] = raw_pw if prep[pk]: out[pk] = prep_pk else: out[pk] = raw_pk if slopes: evt_output = True out['slope'] = {} dat1 = dat if slopes['prep']: dat1 = seg['trans_data'] if slopes['invert']: dat1 = math(dat1, operator=negative, axis='time') if slopes['avg_slope'] and slopes['max_slope']: level = 'all' elif slopes['avg_slope']: level = 'average' else: level = 'maximum' for chan in dat1.axis['chan'][0]: d = dat1(chan=chan)[0] out['slope'][chan] = get_slopes(d, dat.s_freq, level=level) if evt_output: timeline = dat.axis['time'][0] out['start'] = timeline[0] out['end'] = timeline[-1] params_out.append(out) if parent: progress.setValue(i) if progress.wasCanceled(): msg = 'Analysis canceled by user.' parent.statusBar().showMessage(msg) return if parent: progress.close() return params_out
[docs]def export_event_params(filename, params, count=None, density=None): """Write event analysis data to CSV.""" heading_row_1 = ['Segment index', 'Start time', 'End time', 'Stitches', 'Stage', 'Cycle', 'Event type', 'Channel'] spacer = [''] * (len(heading_row_1) - 1) param_headings_1 = ['Min. amplitude (uV)', 'Max. amplitude (uV)', 'Peak-to-peak amplitude (uV)', 'RMS (uV)'] param_headings_2 = ['Power (uV^2)', 'Peak power frequency (Hz)', 'Energy (uV^2s)', 'Peak energy frequency (Hz)'] slope_headings = ['Q1 average slope (uV/s)', 'Q2 average slope (uV/s)', 'Q3 average slope (uV/s)', 'Q4 average slope (uV/s)', 'Q23 average slope (uV/s)', 'Q1 max. slope (uV/s^2)', 'Q2 max. slope (uV/s^2)', 'Q3 max. slope (uV/s^2)', 'Q4 max. slope (uV/s^2)', 'Q23 max. slope (uV/s^2)'] ordered_params_1 = ['minamp', 'maxamp', 'ptp', 'rms'] ordered_params_2 = ['power', 'peakpf', 'energy', 'peakef'] idx_params_1 = in1d(ordered_params_1, list(params[0].keys())) sel_params_1 = list(compress(ordered_params_1, idx_params_1)) heading_row_2 = list(compress(param_headings_1, idx_params_1)) if 'dur' in params[0].keys(): heading_row_2 = ['Duration (s)'] + heading_row_2 idx_params_2 = in1d(ordered_params_2, list(params[0].keys())) sel_params_2 = list(compress(ordered_params_2, idx_params_2)) heading_row_3 = list(compress(param_headings_2, idx_params_2)) heading_row_4 = [] if 'slope' in params[0].keys(): if next(iter(params[0]['slope']))[0]: heading_row_4.extend(slope_headings[:5]) if next(iter(params[0]['slope']))[1]: heading_row_4.extend(slope_headings[5:]) # Get data as matrix and compute descriptives dat = [] if 'dur' in params[0].keys(): one_mat = asarray([seg['dur'] for seg in params \ for chan in seg['data'].axis['chan'][0]]) one_mat = reshape(one_mat, (len(one_mat), 1)) dat.append(one_mat) if sel_params_1: one_mat = asarray([[seg[x](chan=chan)[0] for x in sel_params_1] \ for seg in params for chan in seg['data'].axis['chan'][0]]) dat.append(one_mat) if sel_params_2: one_mat = asarray([[seg[x][chan] for x in sel_params_2] \ for seg in params for chan in seg['data'].axis['chan'][0]]) dat.append(one_mat) if 'slope' in params[0].keys(): one_mat = asarray([[x for y in seg['slope'][chan] for x in y] \ for seg in params for chan in seg['data'].axis['chan'][0]]) dat.append(one_mat) if dat: dat = concatenate(dat, axis=1) desc = get_descriptives(dat) with open(filename, 'w', newline='') as f: lg.info('Writing to ' + str(filename)) csv_file = writer(f) csv_file.writerow(['Wonambi v{}'.format(__version__)]) if count: csv_file.writerow(['Count', count]) if density: csv_file.writerow(['Density', density]) if dat == []: return csv_file.writerow(heading_row_1 + heading_row_2 + heading_row_3 \ + heading_row_4) csv_file.writerow(['Mean'] + spacer + list(desc['mean'])) csv_file.writerow(['SD'] + spacer + list(desc['sd'])) csv_file.writerow(['Mean of ln'] + spacer + list(desc['mean_log'])) csv_file.writerow(['SD of ln'] + spacer + list(desc['sd_log'])) idx = 0 for seg in params: if seg['cycle'] is not None: seg['cycle'] = seg['cycle'][2] for chan in seg['data'].axis['chan'][0]: idx += 1 data_row_1 = [seg[x](chan=chan)[0] for x in sel_params_1] data_row_2 = [seg[x][chan] for x in sel_params_2] if 'dur' in seg.keys(): data_row_1 = [seg['dur']] + data_row_1 if 'slope' in seg.keys(): data_row_3 = [x for y in seg['slope'][chan] for x in y] data_row_2 = data_row_2 + data_row_3 csv_file.writerow([idx, seg['start'], seg['end'], seg['n_stitch'], seg['stage'], seg['cycle'], seg['name'], chan, ] + data_row_1 + data_row_2)
[docs]def export_freq(xfreq, filename, desc=None): """Write frequency analysis data to CSV. Parameters ---------- xfreq : list of dict spectral data, one dict per segment, where 'data' is ChanFreq filename : str output filename desc : dict of ndarray descriptives '""" heading_row_1 = ['Segment index', 'Start time', 'End time', 'Duration', 'Stitches', 'Stage', 'Cycle', 'Event type', 'Channel', ] spacer = [''] * (len(heading_row_1) - 1) freq = list(xfreq[0]['data'].axis['freq'][0]) with open(filename, 'w', newline='') as f: lg.info('Writing to ' + str(filename)) csv_file = writer(f) csv_file.writerow(['Wonambi v{}'.format(__version__)]) csv_file.writerow(heading_row_1 + freq) if desc: csv_file.writerow(['Mean'] + spacer + list(desc['mean'])) csv_file.writerow(['SD'] + spacer + list(desc['sd'])) csv_file.writerow(['Mean of ln'] + spacer + list( desc['mean_log'])) csv_file.writerow(['SD of ln'] + spacer + list(desc['sd_log'])) idx = 0 for seg in xfreq: for chan in seg['data'].axis['chan'][0]: idx += 1 cyc = None if seg['cycle'] is not None: cyc = seg['cycle'][2] data_row = list(seg['data'](chan=chan)[0]) csv_file.writerow([idx, seg['start'], seg['end'], seg['duration'], seg['n_stitch'], seg['stage'], cyc, seg['name'], chan, ] + data_row)
[docs]def export_freq_band(xfreq, bands, filename): """Write frequency analysis data to CSV by pre-defined band.""" heading_row_1 = ['Segment index', 'Start time', 'End time', 'Duration', 'Stitches', 'Stage', 'Cycle', 'Event type', 'Channel', ] spacer = [''] * (len(heading_row_1) - 1) band_hdr = [str(b1) + '-' + str(b2) for b1, b2 in bands] xband = xfreq.copy() for seg in xband: bandlist = [] for i, b in enumerate(bands): pwr, _ = band_power(seg['data'], b) bandlist.append(pwr) seg['band'] = bandlist as_matrix = asarray([ [x['band'][y][chan] for y in range(len(x['band']))] \ for x in xband for chan in x['band'][0].keys()]) desc = get_descriptives(as_matrix) with open(filename, 'w', newline='') as f: lg.info('Writing to ' + str(filename)) csv_file = writer(f) csv_file.writerow(['Wonambi v{}'.format(__version__)]) csv_file.writerow(heading_row_1 + band_hdr) csv_file.writerow(['Mean'] + spacer + list(desc['mean'])) csv_file.writerow(['SD'] + spacer + list(desc['sd'])) csv_file.writerow(['Mean of ln'] + spacer + list(desc['mean_log'])) csv_file.writerow(['SD of ln'] + spacer + list(desc['sd_log'])) idx = 0 for seg in xband: for chan in seg['band'][0].keys(): idx += 1 cyc = None if seg['cycle'] is not None: cyc = seg['cycle'][2] data_row = list( [seg['band'][x][chan] for x in range( len(seg['band']))]) csv_file.writerow([idx, seg['start'], seg['end'], seg['duration'], seg['n_stitch'], seg['stage'], cyc, seg['name'], chan, ] + data_row)
def _amax(x, axis, keepdims=None): return amax(x, axis=axis) def _amin(x, axis, keepdims=None): return amin(x, axis=axis) def _ptp(x, axis, keepdims=None): return ptp(x, axis=axis) def _mean(x, axis, keepdims=None): return mean(x, axis=axis)