The signal of interest is
\begin{align}
z(t)=A e^{-\frac{(t-t_c)^2}{2b_w^2}}\sin(2\pi f t)
\end{align}
of width $b_w$ and frequency $f$. In this example we will take $A=b_w=f=1$ and set $t_c=0$.
The probability density function (PDF),
In this example, we are interested in estimating the time delay ($\tau$) and linear dispersion ($\omega$) parameters, i.e. $g_p(t) = \omega t - \tau$. Therefore $z_g(t)=z(\omega t - \tau)$ and $s_g(t) = B(z_g)(t)$.
Normalized measured signal with noise $\eta \sim \mathcal{N}(0,\sigma^2)$ is given by, \begin{align} r(t) = B(z_g + \eta)(t) \end{align}
Let, $\alpha = \omega$, $\beta = -\tau$, and $\hat{s}$ and $\hat{r}$ be the CDTs of $s(t)$ and $r(t)$, respectively. The Wasserstein distance between $(g_p^{-1})'r\circ g_p^{-1}$ and $s$ is given by,
Let, $\vec{p} = \left[\begin{array}& \alpha\\ \beta\end{array}\right]$. The estimates are calculated as, \begin{align} \widetilde{\vec{p}}= \left[\begin{array}& \widetilde{\alpha}\\ \widetilde{\beta}\end{array}\right] =\left({\bf X}^T{\bf X}\right)^{-1}{\bf X}^T\hat{s} \end{align} where ${\bf X}\equiv \left[\vec{\hat{r}}, \vec{1}\right]$.
import numpy as np
import matplotlib.pyplot as plt
# To use the CDT first install the PyTransKit package using:
# pip install pytranskit
from pytranskit.optrans.continuous.cdt import CDT
N = 400
dt = 0.025
t = np.linspace(-N/2*dt, (N/2-1)*dt, N)
f = 1
epsilon = 1e-8
# Original signal
gwin = np.exp(-t**2/2)
z = gwin*np.sin(2*np.pi*f*t) + epsilon
s = z**2/np.linalg.norm(z)**2
# zero mean additive Gaussian noise
sigma = 0.1 # standard deviation
SNRdb = 10*np.log10(np.mean(z**2)/sigma**2)
print('SNR: {} dB'.format(SNRdb))
noise = np.random.normal(0,sigma,N)
# Signal after delay and dispersion
omega = 0.8
tau = 10.3*dt
gwin = np.exp(-(omega*t - tau)**2/2)
zg = gwin*np.sin(2*np.pi*f*(omega*t - tau)) + noise + epsilon
r = zg**2/np.linalg.norm(zg)**2
# Plot s and sg
fontSize=14
fig, ax = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(10,5))
ax[0].plot(t, s, 'b-',linewidth=2)
ax[0].set_title('$s(t)$',fontsize=fontSize)
ax[1].plot(t, r, 'r-',linewidth=2)
ax[1].set_title('$r(t)$',fontsize=fontSize)
plt.show()
SNR: 9.47544940682685 dB
Expression of noise-corrected CDF is given by, \begin{align} \tilde{S}_g(t) = \frac{E[R(t)]\{\mathcal{E}_z+\sigma^2(t_N-t_1)\}-\sigma^2(t-t_1)}{\mathcal{E}_z}, \quad t_1\leq t \leq t_N \end{align} where $R(t)$ is the CDF associated with $r(t)$, $\sigma$ is the standard deviation of noise, and $\mathcal{E}_z$ is the total energy of the noise free signal.
# Calculate the noise-corrected CDF
R = np.cumsum(r) # CDF of r(t)
Ez = np.mean(z**2)*(t[-1] - t[0]) # Energy of the signal
Stilde = (R*(Ez+sigma**2*(t[-1]-t[0])) - sigma**2*(t-t[0]))/Ez
# Preserve the non-decreasing property of CDFs
mind = np.argmin(Stilde)
Mind = np.argmax(Stilde)
Stilde[0:mind] = Stilde[mind]
Stilde[Mind:] = Stilde[Mind]
for i in range(len(Stilde)-1):
if Stilde[i+1]<=Stilde[i]:
Stilde[i+1] = Stilde[i] + epsilon
Stilde = (Stilde - np.min(Stilde))/(np.max(Stilde) - np.min(Stilde))
# Plot R and Stilde
fontSize=14
fig, ax = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(10,5))
ax[0].plot(t, R, 'r-',linewidth=2)
ax[0].set_title('$R(t)$',fontsize=fontSize)
ax[1].plot(t, Stilde, 'k--',linewidth=2)
ax[1].set_title('$\widetilde{S}(t)$',fontsize=fontSize)
plt.show()
# Calculate the noise-corrected PDF
sg = Stilde
sg[1:] -= sg[:-1].copy()
sg += epsilon
# Plot R and Stilde
fontSize=14
fig, ax = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(10,5))
ax[0].plot(t, r, 'r-',linewidth=2)
ax[0].set_title('$r(t)$',fontsize=fontSize)
ax[1].plot(t, sg, 'k--',linewidth=2)
ax[1].set_title('$\widetilde{s}_g(t)$',fontsize=fontSize)
plt.show()
# Reference signal
t0 = np.linspace(0, 1, N)
z0= np.ones(t0.size)+epsilon
s0 = z0**2/np.linalg.norm(z0)**2
# Create a CDT object
cdt = CDT()
# Compute the forward transform
s_hat, s_hat_old, xtilde = cdt.forward(t0, s0, t, s, rm_edge=False)
sg_hat, sg_hat_old, xtilde = cdt.forward(t0, s0, t, sg, rm_edge=False)
# remove the edges
s_hat = s_hat[25:N-25]
sg_hat = sg_hat[25:N-25]
xtilde = xtilde[25:N-25]
# Plot s_hat and sg_hat
fontSize=14
fig, ax = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(10,5))
ax[0].plot(xtilde, s_hat, 'b-',linewidth=2)
ax[0].set_title('$\widehat{s}(t)$',fontsize=fontSize)
ax[1].plot(xtilde, sg_hat, 'r-',linewidth=2)
ax[1].set_title('$\widehat{s}_g(t)$',fontsize=fontSize)
plt.show()
X = -1*np.ones([len(s_hat),2])
X[:,0] = np.transpose(sg_hat)
estimates = np.linalg.solve(np.matmul(np.transpose(X),X), np.matmul(np.transpose(X),np.transpose(s_hat)))
print('\nTrue value of time delay: '+str(tau) + ' seconds')
print('Estimated value of time delay: '+str(estimates[1]) + ' seconds\n')
print('True value of linear dispersion: '+str(omega))
print('Estimated value of linear dispersion: '+str(estimates[0]) +'\n')
True value of time delay: 0.2575 seconds Estimated value of time delay: 0.2694287224582905 seconds True value of linear dispersion: 0.8 Estimated value of linear dispersion: 0.8408065181070804