Source code for wonambi.trans.extern.dpss

"""create dpss using nitime code. Scipy v1.1 will have dpss
"""
import numpy as np
import scipy.linalg as linalg
import scipy.interpolate as interpolate
import scipy.fftpack as fftpack

[docs]def dpss_windows(N, NW, Kmax, interp_from=None, interp_kind='linear'): """ Returns the Discrete Prolate Spheroidal Sequences of orders [0,Kmax-1] for a given frequency-spacing multiple NW and sequence length N. Parameters ---------- N : int sequence length NW : float, unitless standardized half bandwidth corresponding to 2NW = BW/f0 = BW*N*dt but with dt taken as 1 Kmax : int number of DPSS windows to return is Kmax (orders 0 through Kmax-1) interp_from : int (optional) The dpss can be calculated using interpolation from a set of dpss with the same NW and Kmax, but shorter N. This is the length of this shorter set of dpss windows. interp_kind : str (optional) This input variable is passed to scipy.interpolate.interp1d and specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', 'quadratic, 'cubic') or as an integer specifying the order of the spline interpolator to use. Returns ------- v, e : tuple, v is an array of DPSS windows shaped (Kmax, N) e are the eigenvalues Notes ----- Tridiagonal form of DPSS calculation from: Slepian, D. Prolate spheroidal wave functions, Fourier analysis, and uncertainty V: The discrete case. Bell System Technical Journal, Volume 57 (1978), 1371430 """ Kmax = int(Kmax) W = float(NW) / N nidx = np.arange(N, dtype='d') # In this case, we create the dpss windows of the smaller size # (interp_from) and then interpolate to the larger size (N) if interp_from is not None: if interp_from > N: e_s = 'In dpss_windows, interp_from is: %s ' % interp_from e_s += 'and N is: %s. ' % N e_s += 'Please enter interp_from smaller than N.' raise ValueError(e_s) dpss = [] d, e = dpss_windows(interp_from, NW, Kmax) for this_d in d: x = np.arange(this_d.shape[-1]) I = interpolate.interp1d(x, this_d, kind=interp_kind) d_temp = I(np.linspace(0, this_d.shape[-1] - 1, N, endpoint=False)) # Rescale: d_temp = d_temp / np.sqrt(np.sum(d_temp ** 2)) dpss.append(d_temp) dpss = np.array(dpss) else: # here we want to set up an optimization problem to find a sequence # whose energy is maximally concentrated within band [-W,W]. # Thus, the measure lambda(T,W) is the ratio between the energy within # that band, and the total energy. This leads to the eigen-system # (A - (l1)I)v = 0, where the eigenvector corresponding to the largest # eigenvalue is the sequence with maximally concentrated energy. The # collection of eigenvectors of this system are called Slepian # sequences, or discrete prolate spheroidal sequences (DPSS). Only the # first K, K = 2NW/dt orders of DPSS will exhibit good spectral # concentration # [see http://en.wikipedia.org/wiki/Spectral_concentration_problem] # Here I set up an alternative symmetric tri-diagonal eigenvalue # problem such that # (B - (l2)I)v = 0, and v are our DPSS (but eigenvalues l2 != l1) # the main diagonal = ([N-1-2*t]/2)**2 cos(2PIW), t=[0,1,2,...,N-1] # and the first off-diagonal = t(N-t)/2, t=[1,2,...,N-1] # [see Percival and Walden, 1993] diagonal = ((N - 1 - 2 * nidx) / 2.) ** 2 * np.cos(2 * np.pi * W) off_diag = np.zeros_like(nidx) off_diag[:-1] = nidx[1:] * (N - nidx[1:]) / 2. # put the diagonals in LAPACK "packed" storage ab = np.zeros((2, N), 'd') ab[1] = diagonal ab[0, 1:] = off_diag[:-1] # only calculate the highest Kmax eigenvalues w = linalg.eigvals_banded(ab, select='i', select_range=(N - Kmax, N - 1)) w = w[::-1] # find the corresponding eigenvectors via inverse iteration t = np.linspace(0, np.pi, N) dpss = np.zeros((Kmax, N), 'd') for k in range(Kmax): dpss[k] = tridi_inverse_iteration( diagonal, off_diag, w[k], x0=np.sin((k + 1) * t) ) # By convention (Percival and Walden, 1993 pg 379) # * symmetric tapers (k=0,2,4,...) should have a positive average. # * antisymmetric tapers should begin with a positive lobe fix_symmetric = (dpss[0::2].sum(axis=1) < 0) for i, f in enumerate(fix_symmetric): if f: dpss[2 * i] *= -1 # rather than test the sign of one point, test the sign of the # linear slope up to the first (largest) peak pk = np.argmax(np.abs(dpss[1::2, :N//2]), axis=1) for i, p in enumerate(pk): if np.sum(dpss[2 * i + 1, :p]) < 0: dpss[2 * i + 1] *= -1 # Now find the eigenvalues of the original spectral concentration problem # Use the autocorr sequence technique from Percival and Walden, 1993 pg 390 dpss_rxx = autocorr(dpss) * N r = 4 * W * np.sinc(2 * W * nidx) r[0] = 2 * W eigvals = np.dot(dpss_rxx, r) return dpss, eigvals
[docs]def tridi_inverse_iteration(d, e, w, x0=None, rtol=1e-8): """Perform an inverse iteration to find the eigenvector corresponding to the given eigenvalue in a symmetric tridiagonal system. Parameters ---------- d : ndarray main diagonal of the tridiagonal system e : ndarray offdiagonal stored in e[:-1] w : float eigenvalue of the eigenvector x0 : ndarray initial point to start the iteration rtol : float tolerance for the norm of the difference of iterates Returns ------- e : ndarray The converged eigenvector """ eig_diag = d - w if x0 is None: x0 = np.random.randn(len(d)) x_prev = np.zeros_like(x0) norm_x = np.linalg.norm(x0) # the eigenvector is unique up to sign change, so iterate # until || |x^(n)| - |x^(n-1)| ||^2 < rtol x0 /= norm_x while np.linalg.norm(np.abs(x0) - np.abs(x_prev)) > rtol: x_prev = x0.copy() tridisolve(eig_diag, e, x0) norm_x = np.linalg.norm(x0) x0 /= norm_x return x0
[docs]def tridisolve(d, e, b, overwrite_b=True): """ Symmetric tridiagonal system solver, from Golub and Van Loan, Matrix Computations pg 157 Parameters ---------- d : ndarray main diagonal stored in d[:] e : ndarray superdiagonal stored in e[:-1] b : ndarray RHS vector Returns ------- x : ndarray Solution to Ax = b (if overwrite_b is False). Otherwise solution is stored in previous RHS vector b """ N = len(b) # work vectors dw = d.copy() ew = e.copy() if overwrite_b: x = b else: x = b.copy() for k in range(1, N): # e^(k-1) = e(k-1) / d(k-1) # d(k) = d(k) - e^(k-1)e(k-1) / d(k-1) t = ew[k - 1] ew[k - 1] = t / dw[k - 1] dw[k] = dw[k] - t * ew[k - 1] for k in range(1, N): x[k] = x[k] - ew[k - 1] * x[k - 1] x[N - 1] = x[N - 1] / dw[N - 1] for k in range(N - 2, -1, -1): x[k] = x[k] / dw[k] - ew[k] * x[k + 1] if not overwrite_b: return x
[docs]def autocorr(x, **kwargs): """Returns the autocorrelation of signal s at all lags. Parameters ---------- x : ndarray axis : time axis all_lags : {True/False} whether to return all nonzero lags, or to clip the length of r_xy to be the length of x and y. If False, then the zero lag correlation is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2 Notes ----- Adheres to the definition .. math:: R_{xx}[k]=E\{X[n+k]X^{*}[n]\} where X is a discrete, stationary (ergodic) random process """ # do same computation as autocovariance, # but without subtracting the mean kwargs['debias'] = False return autocov(x, **kwargs)
[docs]def autocov(x, **kwargs): """Returns the autocovariance of signal s at all lags. Parameters ---------- x : ndarray axis : time axis all_lags : {True/False} whether to return all nonzero lags, or to clip the length of r_xy to be the length of x and y. If False, then the zero lag correlation is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2 Returns ------- cxx : ndarray The autocovariance function Notes ----- Adheres to the definition .. math:: C_{xx}[k]=E\{(X[n+k]-E\{X\})(X[n]-E\{X\})^{*}\} where X is a discrete, stationary (ergodic) random process """ # only remove the mean once, if needed debias = kwargs.pop('debias', True) axis = kwargs.get('axis', -1) if debias: x = remove_bias(x, axis) kwargs['debias'] = False return crosscov(x, x, **kwargs)
[docs]def crosscov(x, y, axis=-1, all_lags=False, debias=True, normalize=True): """Returns the crosscovariance sequence between two ndarrays. This is performed by calling fftconvolve on x, y[::-1] Parameters ---------- x : ndarray y : ndarray axis : time axis all_lags : {True/False} whether to return all nonzero lags, or to clip the length of s_xy to be the length of x and y. If False, then the zero lag covariance is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2 debias : {True/False} Always removes an estimate of the mean along the axis, unless told not to (eg X and Y are known zero-mean) Returns ------- cxy : ndarray The crosscovariance function Notes ----- cross covariance of processes x and y is defined as .. math:: C_{xy}[k]=E\{(X(n+k)-E\{X\})(Y(n)-E\{Y\})^{*}\} where X and Y are discrete, stationary (or ergodic) random processes Also note that this routine is the workhorse for all auto/cross/cov/corr functions. """ if x.shape[axis] != y.shape[axis]: raise ValueError( 'crosscov() only works on same-length sequences for now' ) if debias: x = remove_bias(x, axis) y = remove_bias(y, axis) slicing = [slice(d) for d in x.shape] slicing[axis] = slice(None, None, -1) cxy = fftconvolve(x, y[tuple(slicing)].conj(), axis=axis, mode='full') N = x.shape[axis] if normalize: cxy /= N if all_lags: return cxy slicing[axis] = slice(N - 1, 2 * N - 1) return cxy[tuple(slicing)]
[docs]def fftconvolve(in1, in2, mode="full", axis=None): """ Convolve two N-dimensional arrays using FFT. See convolve. This is a fix of scipy.signal.fftconvolve, adding an axis argument and importing locally the stuff only needed for this function """ s1 = np.array(in1.shape) s2 = np.array(in2.shape) complex_result = (np.issubdtype(in1.dtype, np.complexfloating) or np.issubdtype(in2.dtype, np.complexfloating)) if axis is None: size = s1 + s2 - 1 fslice = tuple([slice(0, int(sz)) for sz in size]) else: equal_shapes = s1 == s2 # allow equal_shapes[axis] to be False equal_shapes[axis] = True assert equal_shapes.all(), 'Shape mismatch on non-convolving axes' size = s1[axis] + s2[axis] - 1 fslice = [slice(l) for l in s1] fslice[axis] = slice(0, int(size)) fslice = tuple(fslice) # Always use 2**n-sized FFT fsize = 2 ** int(np.ceil(np.log2(size))) if axis is None: IN1 = fftpack.fftn(in1, fsize) IN1 *= fftpack.fftn(in2, fsize) ret = fftpack.ifftn(IN1)[fslice].copy() else: IN1 = fftpack.fft(in1, fsize, axis=axis) IN1 *= fftpack.fft(in2, fsize, axis=axis) ret = fftpack.ifft(IN1, axis=axis)[fslice].copy() del IN1 if not complex_result: ret = ret.real if mode == "full": return ret elif mode == "same": if np.product(s1, axis=0) > np.product(s2, axis=0): osize = s1 else: osize = s2 return signaltools._centered(ret, osize) elif mode == "valid": return signaltools._centered(ret, abs(s2 - s1) + 1)