Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import numpy as np
- from numpy.fft import fft, ifft, ifftshift
- from ssqueezepy.utils import padsignal, process_scales
- from ssqueezepy.algos import find_closest, indexed_sum
- from ssqueezepy.visuals import imshow, plot
- from ssqueezepy import Wavelet, ssq_cwt
- #%%# Helpers #################################################################
- def _t(min, max, N):
- return np.linspace(min, max, N, endpoint=1)
- def cos_f(freqs, N=128, phi=0):
- return np.concatenate([np.cos(2 * np.pi * f * (_t(i, i + 1, N) + phi))
- for i, f in enumerate(freqs)])
- #%%# Define signal & wavelet #################################################
- fs = 32768 // 4 + 1
- x = cos_f([320], fs) + cos_f([640], fs)
- x /= 2
- wavelet = Wavelet(('morlet', {'mu': 4}))
- #%%# CWT #####################################################################
- nv=32; dt=1/fs
- n = len(x) # store original length
- x, nup, n1, n2 = padsignal(x, padtype='reflect')
- x -= x.mean()
- xh = fft(x)
- scales = process_scales(scales='log:maximal', len_x=n, wavelet=wavelet, nv=nv)
- pn = (-1)**np.arange(nup)
- N_orig = wavelet.N
- wavelet.N = nup
- #%%# cwt ####
- Psih = (wavelet(scale=scales, nohalf=False)).astype('complex128')
- dPsih = (1j * wavelet.xi / dt) * Psih
- Wx = ifftshift(ifft(pn * Psih * xh, axis=-1), axes=-1)
- dWx = ifftshift(ifft(pn * dPsih * xh, axis=-1), axes=-1)
- #%%#
- wavelet.N = N_orig
- # shorten to pre-padded size
- Wx = Wx[:, n1:n1 + n]
- dWx = dWx[:, n1:n1 + n]
- #%%# Phase transform #########################################################
- w = np.imag(dWx / Wx) / (2*np.pi)
- #%%
- # clean up tiny-valued Wx that have large `w` values; removing these makes
- # no noticeable difference on `Tx` but allows us to see much better
- w[np.abs(Wx) < np.abs(Wx).mean()*.5] = 0
- #%%# Reassignment frequencies (mapkind='maximal') ############################
- na, N = Wx.shape
- dT = dt * N
- # normalized frequencies to map discrete-domain to physical:
- # f[[cycles/samples]] -> f[[cycles/second]]
- # minimum measurable (fundamental) frequency of data
- fm = 1 / dT
- # maximum measurable (Nyquist) frequency of data
- fM = 1 / (2 * dt)
- ssq_freqs = fm * np.power(fM / fm, np.arange(na) / (na - 1))
- #%%# Reassignment indices
- # This step simply finds the index-equivalent of `w`. E.g., for given
- # `ssq_freqs` ranging from 2 to 48, if w[5, 2] == 5, then
- # `k[5, 2] = np.where(ssq_freqs == 5)` (or if no exact match, then closest to 5)
- # `k` thus ranges from 0 to `len(ssq_freqs) - 1`.
- k = find_closest(np.log2(w), np.log2(ssq_freqs))
- #%%# Synchrosqueeze #########################################################
- Tx = indexed_sum(Wx * np.log(2) / nv, k)
- Tx = np.flipud(Tx) # flip for visual aligned with `Wx`
- #%%# Visualize ##############################################################
- kw = dict(abs=1, cmap='jet', show=1, aspect='auto')
- imshow(Wx, title="abs(CWT)", **kw)
- imshow(Tx, title="abs(SSQ_CWT)", **kw)
- #%%# Zoom ####
- a, b = 65, 155
- c, d = 1000, 1400
- sidxs, tidxs = np.arange(Wx.shape[0]), np.arange(Wx.shape[1])
- tkw = dict(xticks=tidxs[c:d], yticks=sidxs[a:b])
- imshow(Wx[a:b, c:d], title="abs(CWT), zoomed", **kw, **tkw)
- imshow(Tx[a:b, c:d], title="abs(SSQ_CWT), zoomed", **kw, **tkw)
- #%%# Repeat for `w` ####
- imshow(w, title="Phase transform", **kw)
- imshow(w[a:b, c:d], title="Phase transform, zoomed", **kw, **tkw)
- #%%# Show specific row from wavy parts of zoomed
- plot(w[80, c:d], title="w[80, 1000:1400]", xticks=tkw['xticks'], show=1)
- # We thus confirm a sinusoidal reassignment pattern; but what causes it?
- # `w` is `dWx` divided by `Wx`, so let's look to row 80
- #%%# To compare shapes as opposed to raw magnitudes, `dWx` must be rescaled
- r = np.abs(Wx[80, c:d])
- dr = np.abs(dWx[80, c:d])
- dr *= (np.abs(r).max() / np.abs(dr).max())
- plot(dr, title="abs(dWx), abs(Wx), [80, 1000:1400]")
- plot(r, show=1)
- # We thus observe that magnitudes of `r100` and `dr100` are sines
- # of same frequency, but different amplitudes and offsets (means)
- # The ratio of such sines is another sine - thus the reassignment pattern
- # But WHY are they of same frequency but different amplitudes and offsets?
- # and why are magnitudes sinusoidal at all?
- #%%
- g, h = Wx[80, c:d], dWx[80, c:d]
- plot(g, title="Wx[80, 1000:1400]", complex=1, show=1)
- plot(h, title="dWx[80, 1000:1400]", complex=1, show=1)
- plot(h / g, title="(dWx / Wx)[80, 1000:1400]", complex=1, show=1)
- #%%# Design simpler case to illustrate
- def csoid(f, N): # cos + i*sin
- return cos_f([f], N) + 1j * cos_f([f], N, phi=-1/4/f)
- def csoid90(f, N): # -sin + i*cos = cos(+pi/2) + i*sin(+pi/2)
- return cos_f([f], N, phi=+1/4/f) + 1j * cos_f([f], N)
- def viz_csoids(f1, f2, N, ar, abs_only=0):
- h1 = csoid(f1, N)
- h2 = csoid(f2, N)
- h = h1 / ar + h2
- if not abs_only:
- plot(h, complex=1, show=1,
- title="csoid: (f1=%s) + (f1=%s) -- |f2| / |f1| = %s" % (f1, f2, ar))
- plot(h, title="|(f1) + (f2)|", abs=1, ylims=(0, None), show=1)
- viz_csoids(4, 8, 128, 64)
- viz_csoids(4, 8, 128, 8)
- viz_csoids(4, 8, 128, 2)
- viz_csoids(4, 8, 128, 1)
- #%%
- h1 = csoid(4, 128)
- h2 = csoid(8, 128)
- _kw = dict(show=1,
- vlines=([i for i in np.arange(128, step=16)],
- dict(color='k', linestyle='--')))
- plot(np.abs(h1 / 32 + h2))
- plot(np.abs(h1 / 1 + h2), title="|f2| / |f1| = 32 / 1", ylims=(0, None), **_kw)
- #%%
- plot(np.abs(h1 / (32*8) + h2))
- plot(np.abs(h1 / 8 + h2), title="|f2| / |f1| = 256 / 8", **_kw)
- #%%
- _=[plot(w[80 + 4*i, c:d], show=0) for i in range(6)]
- plot([], title="w[80+i, 1000:1400], i=[0,4,...,16]", show=1)
- #%%
- _=[plot(np.abs(w[a-20:b, 1000 + 1*i]), show=0) for i in range(12)]
- plot([], title="w[45:155, 1000 + i], i=[0,1,...,11]", show=1)
- # Here we see peak amplitudes of earlier sinusoids not changing much toward
- # lower scales (in agreement with previous plot), then leveling in an S-shape,
- # indicative of regions where wavelet is more centered between the two
- #%%
- def _t(min, max, N):
- return np.linspace(min, max, N, endpoint=False)
- _kw = dict(color='tab:orange', linestyle='--', xlims=(-3, 130), show=1)
- g = cos_f([2], 128)
- h = cos_f([32], 128)
- plot(np.abs(g) * h)
- plot(np.abs(g), title="|cos(f=2)| * cos(f=32)", **_kw)
- plot((1 + g/5) * h)
- plot(1 + g/5, title="|1 + .2*cos(f=2)| * cos(f=32)", **_kw)
- #%%
- def plot_wavxh_overlap(sidx, b, Psih, name):
- xha = xh.real.max()
- psih = Psih[sidx]
- psih = psih.real if name == 'psih' else psih.imag
- color = 'purple' if name == 'psih' else 'tab:green'
- scale = float(scales[sidx])
- plot(xh.real[:b])
- plot(psih[:b] * (xha / psih.max()), color=color, show=1,
- title="xh, %s (rescaled), [:%d], scale=%.2f" % (name, b, scale))
- plot((xh * psih).real[:b], show=1,
- title="xh * %s, [:%d], scale=%.2f" % (name, b, scale))
- def plot_wav_overlap(sidx, b, Psih, dPsih):
- psih = Psih[sidx].real
- dpsih = dPsih[sidx].imag
- scale = float(scales[sidx])
- plot(psih[:b] * (1 / psih.max()), color='purple')
- plot(dpsih[:b] * (1 / dpsih.max()), color='tab:green', show=1,
- title="psih, dpsih, [:2000], rescaled, scale=%.2f" % scale)
- xha = xh.real.max()
- psih = Psih[80].real
- dpsih = dPsih[80].imag
- #%%
- plot_wavxh_overlap(80, 8000, Psih, 'psih')
- plot_wavxh_overlap(80, 8000, dPsih, 'dpsih')
- #%%
- plot_wavxh_overlap(115, 8000, Psih, 'psih')
- plot_wavxh_overlap(115, 8000, dPsih, 'dpsih')
- #%%
- plot_wav_overlap(80, 2000, Psih, dPsih)
- plot_wav_overlap(115, 2000, Psih, dPsih)
- #%%
- plot(xh.real[:2000])
- plot(psih[:2000] * (xha / psih.max()), color='purple', show=1,
- title="psih (rescaled), xh, [:2000], scale=%.2f" % float(scales[80]))
- plot((xh * psih).real[:2000], show=1, title="psih * xh, [:2000]")
- #%%
- plot(xh.real[:2000])
- plot(dpsih[:2000] * (xha / dpsih.max()), color='purple', show=1,
- title="dpsih (rescaled), xh, [:2000], scale=%.2f" % float(scales[80]))
- plot((xh * dpsih).real[:2000], show=1, title="dpsih * xh, [:2000]")
- #%%
- def w_sim(f1, f2, A, B, C, D, N=128, lp=31, rp=32):
- num = (A * csoid90(f1, N) + B * csoid90(f2, N)) / N
- den = (C * csoid( f1, N) + D * csoid( f2, N)) / N
- _w = num / den
- return _w.imag[lp:-rp] / (2 * np.pi)
- def ptimag(f1, f2, A, B, C, D, N=128, lp=31, rp=32):
- num = cos_f([f1 - f2], N) * (A*D + B*C) + (A*C + B*D)
- den = cos_f([f1 - f2], N) * (2*C*D) + C**2 + D**2
- return (num / den)[lp:-rp] / (2 * np.pi)
- #%%
- psih = Psih[80].real
- dpsih = dPsih[80].imag
- out = xh * dpsih
- f1, f2 = np.where(out > 10)[0][:2]
- A, B = out[f1].real, out[f2].real
- out = xh * psih
- C, D = out[f1].real, out[f2].real
- lp = (len(psih) - fs) // 2 + 1
- rp = lp - 1
- N = len(dpsih)
- _args = (f1, f2, A, B, C, D, N, lp, rp)
- _w = w_sim(*_args)
- _w2 = ptimag(*_args)
- print(np.mean(np.abs(_w - _w2)))
- #%%
- plot(w[80][:200])
- plot(-.5*(_w.max() - _w.min()) * cos_f([320], len(w[80]))[:200] + 1.003*_w.mean(),
- show=1, title="w[80][:200] vs sinusoid at same frequency")
- #%%
- xi = wavelet.xifn(scale=1, N=2048)[:1024]
- plot(xi, Wavelet(('morlet', {'mu': 5}) )(scale=3.02, N=2048)[:1024])
- plot(xi, Wavelet(('morlet', {'mu': 20}))(scale=12.1, N=2048)[:1024],
- title="mu: (5, 20); scale: (3.02, 12.1) | Morlet wavelet", show=1)
- #%%# Double-sinusoidal ssq## #################################################
- fs = 32768 // 4 + 1
- x = cos_f([320*10.6], fs) + cos_f([320*12], fs)
- x /= 2
- wavelet = Wavelet(('morlet', {'mu': 12.5}))
- #%%
- Tx, _, Wx, _, w = ssq_cwt(x, wavelet)
- Tx = np.flipud(Tx)
- kw = dict(abs=1, cmap='jet', show=1, aspect='auto')
- w[np.isinf(w) | np.isnan(w)] = 0
- w[np.where(np.abs(Wx) < np.abs(Wx).mean())] = 0
- imshow(Wx[:60, :200],
- title="(N, f1, f2, mu) = (8193, 3392, 3840, 12.5) | zoomed", **kw)
- imshow(Tx[:60, :200], **kw)
- imshow(w[ :60, :200], **kw)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement