"""Class to import ABF2.
Adapted from axonrawio.py in python-neo. Strongly simplified.
"""
from datetime import datetime, timedelta
from numpy import c_, empty, float64, NaN, memmap, dtype, newaxis, array
from os import SEEK_SET
from struct import unpack, calcsize
from .utils import DEFAULT_DATETIME
BLOCKSIZE = 512
[docs]class Abf:
"""Class to read abf file.
Only for ABF2, when the data has no gaps (no episodes).
Parameters
----------
filename : path to file
the name of the filename with extension .won
"""
def __init__(self, filename):
self.filename = filename
[docs] def return_hdr(self):
"""Return the header for further use.
Returns
-------
subj_id : str
subject identification code
start_time : datetime
start time of the dataset
s_freq : float
sampling frequency
chan_name : list of str
list of all the channels
n_samples : int
number of samples in the dataset
orig : dict
the full header
"""
with self.filename.open('br') as f:
orig = _read_header(f)
assert orig['protocol']['nOperationMode'] == 3, 'Only continuous no-gap recordings are supported'
assert orig['sections']['SynchArraySection']['llNumEntries'] == 0
assert orig['sections']['SynchArraySection']['uBlockIndex'] == 0
# file format
if orig['nDataFormat'] == 0:
self.dtype = dtype('i2')
elif orig['nDataFormat'] == 1:
self.dtype = dtype('f4')
chan_name = []
offset = []
gain = []
for ch in orig['listADCInfo']:
chan_name.append(ch['ADCChNames'].decode('utf-8').strip())
# compute the gain and offset
ch_gain = orig['protocol']['lADCResolution'] / orig['protocol']['fADCRange']
ch_gain *= (ch['fInstrumentScaleFactor'] *
ch['fSignalGain'] *
ch['fADCProgrammableGain'])
if ch['nTelegraphEnable'] == 1:
ch_gain *= ch['fTelegraphAdditGain']
gain.append(1 / ch_gain)
offset.append(ch['fInstrumentOffset'] -
ch['fSignalOffset'])
self.offset = array(offset)[:, newaxis]
self.gain = array(gain)[:, newaxis]
self.head = orig['sections']['DataSection']['uBlockIndex'] * BLOCKSIZE
self.n_chan = orig['sections']['ADCSection']['llNumEntries']
assert self.n_chan == len(chan_name)
self.n_samples = int(orig['sections']['DataSection']['llNumEntries'] /
self.n_chan)
subj_id = self.filename.stem
try:
start_time = (datetime.strptime(str(orig['uFileStartDate']), '%Y%m%d') +
timedelta(seconds=orig['uFileStartTimeMS'] / 1000))
except ValueError: # no time given, use placeholder
start_time = DEFAULT_DATETIME
s_freq = 1.e6 / orig['protocol']['fADCSequenceInterval']
return subj_id, start_time, s_freq, chan_name, self.n_samples, orig
[docs] def return_dat(self, chan, begsam, endsam):
"""Return the data as 2D numpy.ndarray.
Parameters
----------
chan : int or list
index (indices) of the channels to read
begsam : int
index of the first sample
endsam : int
index of the last sample
Returns
-------
numpy.ndarray
A 2d matrix, with dimension chan X samples. To save memory, the
data are memory-mapped, and you cannot change the values on disk.
Notes
-----
When asking for an interval outside the data boundaries, it returns NaN
for those values.
"""
data = memmap(self.filename, dtype=self.dtype, mode='r', order='F',
shape=(self.n_chan, self.n_samples), offset=self.head)
dat = data[chan, max((begsam, 0)):min((endsam, self.n_samples))].astype(float64)
dat = (dat + self.offset[chan, :]) * self.gain[chan, :]
if begsam < 0:
pad = empty((dat.shape[0], 0 - begsam))
pad.fill(NaN)
dat = c_[pad, dat]
if endsam >= self.n_samples:
pad = empty((dat.shape[0], endsam - self.n_samples))
pad.fill(NaN)
dat = c_[dat, pad]
return dat
[docs] def return_markers(self):
"""I don't know if the ABF contains markers at all.
"""
return []
def _read_header(fid):
"""Based on neo/rawio/axonrawio.py, but I only kept of data with no-gaps
and in one segment.
"""
fid.seek(0, SEEK_SET)
fFileSignature = fid.read(4)
assert fFileSignature == b'ABF2', 'only format ABF2 is currently supported'
header = {}
for key, offset, fmt in headerDescriptionV2:
fid.seek(0 + offset, SEEK_SET)
val = unpack(fmt, fid.read(calcsize(fmt)))
if len(val) == 1:
header[key] = val[0]
else:
header[key] = val
# sections
sections = {}
for s, sectionName in enumerate(sectionNames):
fid.seek(76 + s * 16)
uBlockIndex, uBytes, llNumEntries = unpack('IIl', fid.read(calcsize('IIl')))
sections[sectionName] = {}
sections[sectionName]['uBlockIndex'] = uBlockIndex
sections[sectionName]['uBytes'] = uBytes
sections[sectionName]['llNumEntries'] = llNumEntries
header['sections'] = sections
# strings sections
# hack for reading channels names and units
fid.seek(sections['StringsSection']['uBlockIndex'] * BLOCKSIZE)
big_string = fid.read(sections['StringsSection']['uBytes'])
goodstart = -1
for key in [b'AXENGN', b'clampex', b'Clampex', b'CLAMPEX', b'axoscope', b'Clampfit']:
goodstart = big_string.find(key)
if goodstart != -1:
break
assert goodstart != -1, 'This file does not contain clampex, axoscope or clampfit in the header'
big_string = big_string[goodstart:]
strings = big_string.split(b'\x00')
# ADC sections
header['listADCInfo'] = []
for i in range(sections['ADCSection']['llNumEntries']):
# read ADCInfo
fid.seek(sections['ADCSection']['uBlockIndex'] *
BLOCKSIZE + sections['ADCSection']['uBytes'] * i)
ADCInfo = _read_info_as_dict(fid, ADCInfoDescription)
ADCInfo['ADCChNames'] = strings[ADCInfo['lADCChannelNameIndex'] - 1]
ADCInfo['ADCChUnits'] = strings[ADCInfo['lADCUnitsIndex'] - 1]
header['listADCInfo'].append(ADCInfo)
# protocol sections
fid.seek(sections['ProtocolSection']['uBlockIndex'] * BLOCKSIZE)
header['protocol'] = _read_info_as_dict(fid, protocolInfoDescription)
header['sProtocolPath'] = strings[header['uProtocolPathIndex'] - 1]
# DAC sections
header['listDACInfo'] = []
for i in range(sections['DACSection']['llNumEntries']):
# read DACInfo
fid.seek(sections['DACSection']['uBlockIndex'] *
BLOCKSIZE + sections['DACSection']['uBytes'] * i)
DACInfo = _read_info_as_dict(fid, DACInfoDescription)
DACInfo['DACChNames'] = strings[DACInfo['lDACChannelNameIndex'] - 1]
DACInfo['DACChUnits'] = strings[
DACInfo['lDACChannelUnitsIndex'] - 1]
header['listDACInfo'].append(DACInfo)
""" Not present in test file. No tests, no code.
# tags
listTag = []
for i in range(sections['TagSection']['llNumEntries']):
fid.seek(sections['TagSection']['uBlockIndex'] *
BLOCKSIZE + sections['TagSection']['uBytes'] * i)
tag = _read_info_as_dict(fid, TagInfoDescription)
listTag.append(tag)
header['listTag'] = listTag
# EpochPerDAC sections
# header['dictEpochInfoPerDAC'] is dict of dicts:
# - the first index is the DAC number
# - the second index is the epoch number
# It has to be done like that because data may not exist
# and may not be in sorted order
header['dictEpochInfoPerDAC'] = {}
for i in range(sections['EpochPerDACSection']['llNumEntries']):
# read DACInfo
fid.seek(sections['EpochPerDACSection']['uBlockIndex'] *
BLOCKSIZE +
sections['EpochPerDACSection']['uBytes'] * i)
EpochInfoPerDAC = _read_info_as_dict(fid, EpochInfoPerDACDescription)
DACNum = EpochInfoPerDAC['nDACNum']
EpochNum = EpochInfoPerDAC['nEpochNum']
# Checking if the key exists, if not, the value is empty
# so we have to create empty dict to populate
if DACNum not in header['dictEpochInfoPerDAC']:
header['dictEpochInfoPerDAC'][DACNum] = {}
header['dictEpochInfoPerDAC'][DACNum][EpochNum] =\
EpochInfoPerDAC
"""
return header
def _read_info_as_dict(fid, values):
"""Convenience function to read info in axon data to a nicely organized
dict.
"""
output = {}
for key, fmt in values:
val = unpack(fmt, fid.read(calcsize(fmt)))
if len(val) == 1:
output[key] = val[0]
else:
output[key] = val
return output
headerDescriptionV2 = [
('fFileSignature', 0, '4s'),
('fFileVersionNumber', 4, '4b'),
('uFileInfoSize', 8, 'I'),
('lActualEpisodes', 12, 'I'),
('uFileStartDate', 16, 'I'),
('uFileStartTimeMS', 20, 'I'),
('uStopwatchTime', 24, 'I'),
('nFileType', 28, 'H'),
('nDataFormat', 30, 'H'),
('nSimultaneousScan', 32, 'H'),
('nCRCEnable', 34, 'H'),
('uFileCRC', 36, 'I'),
('FileGUID', 40, 'I'),
('uCreatorVersion', 56, 'I'),
('uCreatorNameIndex', 60, 'I'),
('uModifierVersion', 64, 'I'),
('uModifierNameIndex', 68, 'I'),
('uProtocolPathIndex', 72, 'I'),
]
sectionNames = [
'ProtocolSection',
'ADCSection',
'DACSection',
'EpochSection',
'ADCPerDACSection',
'EpochPerDACSection',
'UserListSection',
'StatsRegionSection',
'MathSection',
'StringsSection',
'DataSection',
'TagSection',
'ScopeSection',
'DeltaSection',
'VoiceTagSection',
'SynchArraySection',
'AnnotationSection',
'StatsSection',
]
protocolInfoDescription = [
('nOperationMode', 'h'),
('fADCSequenceInterval', 'f'),
('bEnableFileCompression', 'b'),
('sUnused1', '3s'),
('uFileCompressionRatio', 'I'),
('fSynchTimeUnit', 'f'),
('fSecondsPerRun', 'f'),
('lNumSamplesPerEpisode', 'i'),
('lPreTriggerSamples', 'i'),
('lEpisodesPerRun', 'i'),
('lRunsPerTrial', 'i'),
('lNumberOfTrials', 'i'),
('nAveragingMode', 'h'),
('nUndoRunCount', 'h'),
('nFirstEpisodeInRun', 'h'),
('fTriggerThreshold', 'f'),
('nTriggerSource', 'h'),
('nTriggerAction', 'h'),
('nTriggerPolarity', 'h'),
('fScopeOutputInterval', 'f'),
('fEpisodeStartToStart', 'f'),
('fRunStartToStart', 'f'),
('lAverageCount', 'i'),
('fTrialStartToStart', 'f'),
('nAutoTriggerStrategy', 'h'),
('fFirstRunDelayS', 'f'),
('nChannelStatsStrategy', 'h'),
('lSamplesPerTrace', 'i'),
('lStartDisplayNum', 'i'),
('lFinishDisplayNum', 'i'),
('nShowPNRawData', 'h'),
('fStatisticsPeriod', 'f'),
('lStatisticsMeasurements', 'i'),
('nStatisticsSaveStrategy', 'h'),
('fADCRange', 'f'),
('fDACRange', 'f'),
('lADCResolution', 'i'),
('lDACResolution', 'i'),
('nExperimentType', 'h'),
('nManualInfoStrategy', 'h'),
('nCommentsEnable', 'h'),
('lFileCommentIndex', 'i'),
('nAutoAnalyseEnable', 'h'),
('nSignalType', 'h'),
('nDigitalEnable', 'h'),
('nActiveDACChannel', 'h'),
('nDigitalHolding', 'h'),
('nDigitalInterEpisode', 'h'),
('nDigitalDACChannel', 'h'),
('nDigitalTrainActiveLogic', 'h'),
('nStatsEnable', 'h'),
('nStatisticsClearStrategy', 'h'),
('nLevelHysteresis', 'h'),
('lTimeHysteresis', 'i'),
('nAllowExternalTags', 'h'),
('nAverageAlgorithm', 'h'),
('fAverageWeighting', 'f'),
('nUndoPromptStrategy', 'h'),
('nTrialTriggerSource', 'h'),
('nStatisticsDisplayStrategy', 'h'),
('nExternalTagType', 'h'),
('nScopeTriggerOut', 'h'),
('nLTPType', 'h'),
('nAlternateDACOutputState', 'h'),
('nAlternateDigitalOutputState', 'h'),
('fCellID', '3f'),
('nDigitizerADCs', 'h'),
('nDigitizerDACs', 'h'),
('nDigitizerTotalDigitalOuts', 'h'),
('nDigitizerSynchDigitalOuts', 'h'),
('nDigitizerType', 'h'),
]
ADCInfoDescription = [
('nADCNum', 'h'),
('nTelegraphEnable', 'h'),
('nTelegraphInstrument', 'h'),
('fTelegraphAdditGain', 'f'),
('fTelegraphFilter', 'f'),
('fTelegraphMembraneCap', 'f'),
('nTelegraphMode', 'h'),
('fTelegraphAccessResistance', 'f'),
('nADCPtoLChannelMap', 'h'),
('nADCSamplingSeq', 'h'),
('fADCProgrammableGain', 'f'),
('fADCDisplayAmplification', 'f'),
('fADCDisplayOffset', 'f'),
('fInstrumentScaleFactor', 'f'),
('fInstrumentOffset', 'f'),
('fSignalGain', 'f'),
('fSignalOffset', 'f'),
('fSignalLowpassFilter', 'f'),
('fSignalHighpassFilter', 'f'),
('nLowpassFilterType', 'b'),
('nHighpassFilterType', 'b'),
('fPostProcessLowpassFilter', 'f'),
('nPostProcessLowpassFilterType', 'c'),
('bEnabledDuringPN', 'b'),
('nStatsChannelPolarity', 'h'),
('lADCChannelNameIndex', 'i'),
('lADCUnitsIndex', 'i'),
]
TagInfoDescription = [
('lTagTime', 'i'),
('sComment', '56s'),
('nTagType', 'h'),
('nVoiceTagNumber_or_AnnotationIndex', 'h'),
]
DACInfoDescription = [
('nDACNum', 'h'),
('nTelegraphDACScaleFactorEnable', 'h'),
('fInstrumentHoldingLevel', 'f'),
('fDACScaleFactor', 'f'),
('fDACHoldingLevel', 'f'),
('fDACCalibrationFactor', 'f'),
('fDACCalibrationOffset', 'f'),
('lDACChannelNameIndex', 'i'),
('lDACChannelUnitsIndex', 'i'),
('lDACFilePtr', 'i'),
('lDACFileNumEpisodes', 'i'),
('nWaveformEnable', 'h'),
('nWaveformSource', 'h'),
('nInterEpisodeLevel', 'h'),
('fDACFileScale', 'f'),
('fDACFileOffset', 'f'),
('lDACFileEpisodeNum', 'i'),
('nDACFileADCNum', 'h'),
('nConditEnable', 'h'),
('lConditNumPulses', 'i'),
('fBaselineDuration', 'f'),
('fBaselineLevel', 'f'),
('fStepDuration', 'f'),
('fStepLevel', 'f'),
('fPostTrainPeriod', 'f'),
('fPostTrainLevel', 'f'),
('nMembTestEnable', 'h'),
('nLeakSubtractType', 'h'),
('nPNPolarity', 'h'),
('fPNHoldingLevel', 'f'),
('nPNNumADCChannels', 'h'),
('nPNPosition', 'h'),
('nPNNumPulses', 'h'),
('fPNSettlingTime', 'f'),
('fPNInterpulse', 'f'),
('nLTPUsageOfDAC', 'h'),
('nLTPPresynapticPulses', 'h'),
('lDACFilePathIndex', 'i'),
('fMembTestPreSettlingTimeMS', 'f'),
('fMembTestPostSettlingTimeMS', 'f'),
('nLeakSubtractADCIndex', 'h'),
('sUnused', '124s'),
]
EpochInfoPerDACDescription = [
('nEpochNum', 'h'),
('nDACNum', 'h'),
('nEpochType', 'h'),
('fEpochInitLevel', 'f'),
('fEpochLevelInc', 'f'),
('lEpochInitDuration', 'i'),
('lEpochDurationInc', 'i'),
('lEpochPulsePeriod', 'i'),
('lEpochPulseWidth', 'i'),
('sUnused', '18s'),
]