"""Class to import straight text records.
"""
from logging import getLogger
from numpy import empty, float64, floor
from os import listdir
from os.path import splitext
from pathlib import Path
from .utils import DEFAULT_DATETIME
lg = getLogger(__name__)
[docs]class Text:
"""Class to read text format records. The record consists of a directory
containing txt files. Each file is a channel. The first line of each file
is the sampling rate, and the following lines are single data points,
in scientific notation.
Only supports a single sampling rate for all channels.
Parameters
----------
rec_dir : path to record directory
the folder containing the record
Notes
-----
Text is a very slow format for reading data. It is best to use this class
to import the record, then to export is as a Wonambi (.won) file, and use
that for reading.
"""
def __init__(self, rec_dir):
lg.info('Reading ' + str(rec_dir))
self.filename = rec_dir
self.hdr = self.return_hdr()
# range data are absent
self.dig_min = -0.000512 # estimated from min values
self.dig_max = 0.000512 # estimated from max values
self.phys_min = -800 # estimate
self.phys_max = 800 # estimate
[docs] def return_hdr(self):
"""Return the header for further use.
Returns
-------
subj_id : str
subject identification code
start_time : datetime
start time of the dataset
s_freq : float
sampling frequency
chan_name : list of str
list of all the channels
n_samples : int
number of samples in the dataset
orig : dict
the full header
"""
foldername = Path(self.filename)
chan_files = self.chan_files = []
hdr = {}
hdr['chan_name'] = []
hdr['start_time'] = DEFAULT_DATETIME
for file in listdir(self.filename):
base, suffix = splitext(file)
if suffix == '.txt':
if base[-3:] == 'hyp':
self.hypno_file = file
else:
chan_files.append(foldername / file)
chan_name = base[base.index('_') + 1:]
hdr['chan_name'].append(chan_name)
hdr['subj_id'] = base[:base.index('_')]
if not chan_files:
raise FileNotFoundError('No channel found.')
return
with open(chan_files[0], 'rt') as f:
line0 = f.readline()
hdr['s_freq'] = int(
line0[line0.index('Rate:') + 5:line0.index('Hz')])
for i, _ in enumerate(f):
pass
hdr['n_samples'] = i
output = (hdr['subj_id'], hdr['start_time'], hdr['s_freq'],
hdr['chan_name'], hdr['n_samples'], hdr)
return output
[docs] def return_dat(self, chan, begsam, endsam):
"""Return the data as 2D numpy.ndarray.
Parameters
----------
chan : list of int
index (indices) of the channels to read
begsam : int
index of the first sample (inclusively)
endsam : int
index of the last sample (exclusively)
Returns
-------
numpy.ndarray
A 2d matrix, with dimension chan X samples.
"""
#n_sam = self.hdr[4]
interval = endsam - begsam
dat = empty((len(chan), interval))
#beg_block = floor((begsam / n_sam) * n_block)
#end_block = floor((endsam / n_sam) * n_block)
for i, chan in enumerate(chan):
k = 0
with open(self.chan_files[chan], 'rt') as f:
f.readline()
for j, datum in enumerate(f):
if begsam <= j + 1 < endsam:
dat[i, k] = float64(datum)
k += 1
if k == interval:
break
# calibration
phys_range = self.phys_max - self.phys_min
dig_range = self.dig_max - self.dig_min
gain = phys_range / dig_range
dat *= gain
return dat
[docs] def return_markers(self):
"""There are no markers in this format.
"""
return []
#==============================================================================
# def split_file(filepath, lines_per_file=100):
# """splits file at `filepath` into sub-files of length `lines_per_file`
# """
# lpf = lines_per_file
# path, filename = split(filepath)
# with open(filepath, 'r') as r:
# name, ext = splitext(filename)
# try:
# w = open(join(path, '{}_{}{}'.format(name, 0, ext)), 'w')
# for i, line in enumerate(r):
# if not i % lpf:
# #possible enhancement: don't check modulo lpf on each pass
# #keep a counter variable, and reset on each checkpoint lpf.
# w.close()
# filename = join(path,
# name,
# '{}{}'.format(i, ext))
# w = open(filename, 'w')
# w.write(line)
# finally:
# w.close()
#==============================================================================