"""create dpss using nitime code. Scipy v1.1 will have dpss
"""
import numpy as np
import scipy.linalg as linalg
import scipy.interpolate as interpolate
import scipy.fftpack as fftpack
[docs]def dpss_windows(N, NW, Kmax, interp_from=None, interp_kind='linear'):
"""
Returns the Discrete Prolate Spheroidal Sequences of orders [0,Kmax-1]
for a given frequency-spacing multiple NW and sequence length N.
Parameters
----------
N : int
sequence length
NW : float, unitless
standardized half bandwidth corresponding to 2NW = BW/f0 = BW*N*dt
but with dt taken as 1
Kmax : int
number of DPSS windows to return is Kmax (orders 0 through Kmax-1)
interp_from : int (optional)
The dpss can be calculated using interpolation from a set of dpss
with the same NW and Kmax, but shorter N. This is the length of this
shorter set of dpss windows.
interp_kind : str (optional)
This input variable is passed to scipy.interpolate.interp1d and
specifies the kind of interpolation as a string ('linear', 'nearest',
'zero', 'slinear', 'quadratic, 'cubic') or as an integer specifying the
order of the spline interpolator to use.
Returns
-------
v, e : tuple,
v is an array of DPSS windows shaped (Kmax, N)
e are the eigenvalues
Notes
-----
Tridiagonal form of DPSS calculation from:
Slepian, D. Prolate spheroidal wave functions, Fourier analysis, and
uncertainty V: The discrete case. Bell System Technical Journal,
Volume 57 (1978), 1371430
"""
Kmax = int(Kmax)
W = float(NW) / N
nidx = np.arange(N, dtype='d')
# In this case, we create the dpss windows of the smaller size
# (interp_from) and then interpolate to the larger size (N)
if interp_from is not None:
if interp_from > N:
e_s = 'In dpss_windows, interp_from is: %s ' % interp_from
e_s += 'and N is: %s. ' % N
e_s += 'Please enter interp_from smaller than N.'
raise ValueError(e_s)
dpss = []
d, e = dpss_windows(interp_from, NW, Kmax)
for this_d in d:
x = np.arange(this_d.shape[-1])
I = interpolate.interp1d(x, this_d, kind=interp_kind)
d_temp = I(np.linspace(0, this_d.shape[-1] - 1, N, endpoint=False))
# Rescale:
d_temp = d_temp / np.sqrt(np.sum(d_temp ** 2))
dpss.append(d_temp)
dpss = np.array(dpss)
else:
# here we want to set up an optimization problem to find a sequence
# whose energy is maximally concentrated within band [-W,W].
# Thus, the measure lambda(T,W) is the ratio between the energy within
# that band, and the total energy. This leads to the eigen-system
# (A - (l1)I)v = 0, where the eigenvector corresponding to the largest
# eigenvalue is the sequence with maximally concentrated energy. The
# collection of eigenvectors of this system are called Slepian
# sequences, or discrete prolate spheroidal sequences (DPSS). Only the
# first K, K = 2NW/dt orders of DPSS will exhibit good spectral
# concentration
# [see http://en.wikipedia.org/wiki/Spectral_concentration_problem]
# Here I set up an alternative symmetric tri-diagonal eigenvalue
# problem such that
# (B - (l2)I)v = 0, and v are our DPSS (but eigenvalues l2 != l1)
# the main diagonal = ([N-1-2*t]/2)**2 cos(2PIW), t=[0,1,2,...,N-1]
# and the first off-diagonal = t(N-t)/2, t=[1,2,...,N-1]
# [see Percival and Walden, 1993]
diagonal = ((N - 1 - 2 * nidx) / 2.) ** 2 * np.cos(2 * np.pi * W)
off_diag = np.zeros_like(nidx)
off_diag[:-1] = nidx[1:] * (N - nidx[1:]) / 2.
# put the diagonals in LAPACK "packed" storage
ab = np.zeros((2, N), 'd')
ab[1] = diagonal
ab[0, 1:] = off_diag[:-1]
# only calculate the highest Kmax eigenvalues
w = linalg.eigvals_banded(ab, select='i',
select_range=(N - Kmax, N - 1))
w = w[::-1]
# find the corresponding eigenvectors via inverse iteration
t = np.linspace(0, np.pi, N)
dpss = np.zeros((Kmax, N), 'd')
for k in range(Kmax):
dpss[k] = tridi_inverse_iteration(
diagonal, off_diag, w[k], x0=np.sin((k + 1) * t)
)
# By convention (Percival and Walden, 1993 pg 379)
# * symmetric tapers (k=0,2,4,...) should have a positive average.
# * antisymmetric tapers should begin with a positive lobe
fix_symmetric = (dpss[0::2].sum(axis=1) < 0)
for i, f in enumerate(fix_symmetric):
if f:
dpss[2 * i] *= -1
# rather than test the sign of one point, test the sign of the
# linear slope up to the first (largest) peak
pk = np.argmax(np.abs(dpss[1::2, :N//2]), axis=1)
for i, p in enumerate(pk):
if np.sum(dpss[2 * i + 1, :p]) < 0:
dpss[2 * i + 1] *= -1
# Now find the eigenvalues of the original spectral concentration problem
# Use the autocorr sequence technique from Percival and Walden, 1993 pg 390
dpss_rxx = autocorr(dpss) * N
r = 4 * W * np.sinc(2 * W * nidx)
r[0] = 2 * W
eigvals = np.dot(dpss_rxx, r)
return dpss, eigvals
[docs]def tridi_inverse_iteration(d, e, w, x0=None, rtol=1e-8):
"""Perform an inverse iteration to find the eigenvector corresponding
to the given eigenvalue in a symmetric tridiagonal system.
Parameters
----------
d : ndarray
main diagonal of the tridiagonal system
e : ndarray
offdiagonal stored in e[:-1]
w : float
eigenvalue of the eigenvector
x0 : ndarray
initial point to start the iteration
rtol : float
tolerance for the norm of the difference of iterates
Returns
-------
e : ndarray
The converged eigenvector
"""
eig_diag = d - w
if x0 is None:
x0 = np.random.randn(len(d))
x_prev = np.zeros_like(x0)
norm_x = np.linalg.norm(x0)
# the eigenvector is unique up to sign change, so iterate
# until || |x^(n)| - |x^(n-1)| ||^2 < rtol
x0 /= norm_x
while np.linalg.norm(np.abs(x0) - np.abs(x_prev)) > rtol:
x_prev = x0.copy()
tridisolve(eig_diag, e, x0)
norm_x = np.linalg.norm(x0)
x0 /= norm_x
return x0
[docs]def tridisolve(d, e, b, overwrite_b=True):
"""
Symmetric tridiagonal system solver,
from Golub and Van Loan, Matrix Computations pg 157
Parameters
----------
d : ndarray
main diagonal stored in d[:]
e : ndarray
superdiagonal stored in e[:-1]
b : ndarray
RHS vector
Returns
-------
x : ndarray
Solution to Ax = b (if overwrite_b is False). Otherwise solution is
stored in previous RHS vector b
"""
N = len(b)
# work vectors
dw = d.copy()
ew = e.copy()
if overwrite_b:
x = b
else:
x = b.copy()
for k in range(1, N):
# e^(k-1) = e(k-1) / d(k-1)
# d(k) = d(k) - e^(k-1)e(k-1) / d(k-1)
t = ew[k - 1]
ew[k - 1] = t / dw[k - 1]
dw[k] = dw[k] - t * ew[k - 1]
for k in range(1, N):
x[k] = x[k] - ew[k - 1] * x[k - 1]
x[N - 1] = x[N - 1] / dw[N - 1]
for k in range(N - 2, -1, -1):
x[k] = x[k] / dw[k] - ew[k] * x[k + 1]
if not overwrite_b:
return x
[docs]def autocorr(x, **kwargs):
"""Returns the autocorrelation of signal s at all lags.
Parameters
----------
x : ndarray
axis : time axis
all_lags : {True/False}
whether to return all nonzero lags, or to clip the length of r_xy
to be the length of x and y. If False, then the zero lag correlation
is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2
Notes
-----
Adheres to the definition
.. math::
R_{xx}[k]=E\{X[n+k]X^{*}[n]\}
where X is a discrete, stationary (ergodic) random process
"""
# do same computation as autocovariance,
# but without subtracting the mean
kwargs['debias'] = False
return autocov(x, **kwargs)
[docs]def autocov(x, **kwargs):
"""Returns the autocovariance of signal s at all lags.
Parameters
----------
x : ndarray
axis : time axis
all_lags : {True/False}
whether to return all nonzero lags, or to clip the length of r_xy
to be the length of x and y. If False, then the zero lag correlation
is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2
Returns
-------
cxx : ndarray
The autocovariance function
Notes
-----
Adheres to the definition
.. math::
C_{xx}[k]=E\{(X[n+k]-E\{X\})(X[n]-E\{X\})^{*}\}
where X is a discrete, stationary (ergodic) random process
"""
# only remove the mean once, if needed
debias = kwargs.pop('debias', True)
axis = kwargs.get('axis', -1)
if debias:
x = remove_bias(x, axis)
kwargs['debias'] = False
return crosscov(x, x, **kwargs)
[docs]def crosscov(x, y, axis=-1, all_lags=False, debias=True, normalize=True):
"""Returns the crosscovariance sequence between two ndarrays.
This is performed by calling fftconvolve on x, y[::-1]
Parameters
----------
x : ndarray
y : ndarray
axis : time axis
all_lags : {True/False}
whether to return all nonzero lags, or to clip the length of s_xy
to be the length of x and y. If False, then the zero lag covariance
is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2
debias : {True/False}
Always removes an estimate of the mean along the axis, unless
told not to (eg X and Y are known zero-mean)
Returns
-------
cxy : ndarray
The crosscovariance function
Notes
-----
cross covariance of processes x and y is defined as
.. math::
C_{xy}[k]=E\{(X(n+k)-E\{X\})(Y(n)-E\{Y\})^{*}\}
where X and Y are discrete, stationary (or ergodic) random processes
Also note that this routine is the workhorse for all auto/cross/cov/corr
functions.
"""
if x.shape[axis] != y.shape[axis]:
raise ValueError(
'crosscov() only works on same-length sequences for now'
)
if debias:
x = remove_bias(x, axis)
y = remove_bias(y, axis)
slicing = [slice(d) for d in x.shape]
slicing[axis] = slice(None, None, -1)
cxy = fftconvolve(x, y[tuple(slicing)].conj(), axis=axis, mode='full')
N = x.shape[axis]
if normalize:
cxy /= N
if all_lags:
return cxy
slicing[axis] = slice(N - 1, 2 * N - 1)
return cxy[tuple(slicing)]
[docs]def fftconvolve(in1, in2, mode="full", axis=None):
""" Convolve two N-dimensional arrays using FFT. See convolve.
This is a fix of scipy.signal.fftconvolve, adding an axis argument and
importing locally the stuff only needed for this function
"""
s1 = np.array(in1.shape)
s2 = np.array(in2.shape)
complex_result = (np.issubdtype(in1.dtype, np.complexfloating) or
np.issubdtype(in2.dtype, np.complexfloating))
if axis is None:
size = s1 + s2 - 1
fslice = tuple([slice(0, int(sz)) for sz in size])
else:
equal_shapes = s1 == s2
# allow equal_shapes[axis] to be False
equal_shapes[axis] = True
assert equal_shapes.all(), 'Shape mismatch on non-convolving axes'
size = s1[axis] + s2[axis] - 1
fslice = [slice(l) for l in s1]
fslice[axis] = slice(0, int(size))
fslice = tuple(fslice)
# Always use 2**n-sized FFT
fsize = 2 ** int(np.ceil(np.log2(size)))
if axis is None:
IN1 = fftpack.fftn(in1, fsize)
IN1 *= fftpack.fftn(in2, fsize)
ret = fftpack.ifftn(IN1)[fslice].copy()
else:
IN1 = fftpack.fft(in1, fsize, axis=axis)
IN1 *= fftpack.fft(in2, fsize, axis=axis)
ret = fftpack.ifft(IN1, axis=axis)[fslice].copy()
del IN1
if not complex_result:
ret = ret.real
if mode == "full":
return ret
elif mode == "same":
if np.product(s1, axis=0) > np.product(s2, axis=0):
osize = s1
else:
osize = s2
return signaltools._centered(ret, osize)
elif mode == "valid":
return signaltools._centered(ret, abs(s2 - s1) + 1)