In this notebook we will showcase the use of autoencoders for denoising waveforms signals, hopefully improving the Signal-To-Noise Ratio (SNR).
An autoencoder is a particular type of neural network which is trained to present as output a recostruction of the input but it is forced to do so by learning an encoding of it.
The network is forced to perform an efficient dimensionality reduction (very similarly to Principal Component Analysis but in a non-linear manner) of the input data, thus ignoring the "noise", by reducing the number of the node in the middle layer.
The learning process is performed in a unsupervised manner or, more accurately, self-supervised, as the expected output could be generated by simply adding noise to the input
First we are going to generate a training set composed of random waveforms. The waveform we are going to generate are a composition of two sine waves of random amplitude and frequency.
A noisy version of the waveforms is then generated by simply adding some gaussian noise.
The target of the network is to minimize the Signal-To-Noise Ratio (SNR) of the input signals, which is so defined:
SNR=10log10RMSsignalRMSnoisedB
Where RMS is the root mean square value of the waveform:
RMSsignal=√∑ni=1y2i
Some examples of the training set are plotted below.
%matplotlib inline
import numpy as np
import random
import math
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping
SIGNAL_LENGHT = 100
BATCH_SIZE = 10000
NOISE_FACTOR = 10
def genSinusoid(amplitude, frequency, offset, length):
return amplitude * np.sin(np.linspace(
length / 15.0 * frequency * 0.0 * math.pi + offset,
length / 15.0 * frequency * 3.0 * math.pi + offset,
length ) / ( length / 5 )
)
def genRndSinusoid(length):
rndOffset = random.random() * 2 * math.pi
rndFreq = (random.random() - 0.5) / 1.5 * 15 + 0.5
rndAmp = random.random() + 0.1
sinusoid = genSinusoid(rndAmp, rndFreq, rndOffset, length)
rndOffset = random.random() * 2 * math.pi
rndFreq = (random.random() - 0.5) / 1.5 * 15 + 0.5
rndAmp = random.random() * 1.2
return sinusoid + genSinusoid(rndAmp, rndFreq, rndOffset, length)
def genNoise(sinusoid, noiseAmount=1):
return sinusoid + np.random.normal(scale=noiseAmount/10, size=sinusoid.shape[0])
def computeRMS(signal):
return np.sqrt(np.mean(signal**2))
def computeSNR(signal, noise):
return 10*math.log10((computeRMS(signal)/computeRMS(noise))**2)
def genSinusoidBatch(lenght, batchSize, noiseAmount=1):
sins = []
noisySins = []
for _ in range(batchSize):
sin = genRndSinusoid(lenght)
sins.append(sin)
noisySins.append(genNoise(sin, noiseAmount))
return np.array([sins, noisySins])
sins, noisySins = genSinusoidBatch(SIGNAL_LENGHT, BATCH_SIZE, NOISE_FACTOR)
for x in range(5):
s = sins[x]
ns = noisySins[x]
plt.figure()
plt.plot(s, label='Signal (RMS=%.3f)'%computeRMS(s))
plt.plot(ns, label='Noisy signal (RMS=%.3f)'%computeRMS(ns))
plt.legend()
plt.title('SNR = %.3f dB' % computeSNR(s, ns-s))
We then proceed to construct the network.
For convenience, input data are sequences of a fixed number of samplings of a waveform. Longer waveform could be splitted into chunks before being submitted to the network.
The input layer and the output layer must thus have the same number of neurons, corresponding to the number of samplings in the training set waveforms.
The hidden layer, conversely, must be composed of very few neurons for the network to correctly learn an efficient encoding of meaningful features of the input data, and not merely copy it.
The Mean Square Error (MSE) is chosen as loss function for the network.
MSE is the sum of the squared differences between the prediction (ˆy) and the expected (y). We aim to minimizze MSE.
MSE=1n∑ni=1(ˆyi−yi)2
As for the backpropagation algorithm, ADAM is empirically chosen over more classical methods like Stochastic Gradient Descent.
An early stoppping strategy is also adopted based on the values of the loss function.
model = Sequential()
model.add(Dense(12, input_dim=SIGNAL_LENGHT, activation='relu'))
model.add(Dense(SIGNAL_LENGHT))
model.compile(loss='mean_squared_error', optimizer='adam')
model.summary()
monitor = EarlyStopping(monitor='loss', min_delta=1e-3, patience=5,
verbose=1, mode='auto', restore_best_weights=True)
model.fit(noisySins,sins,callbacks=[monitor],verbose=1,epochs=50)
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) (None, 12) 1212 _________________________________________________________________ dense_1 (Dense) (None, 100) 1300 ================================================================= Total params: 2,512 Trainable params: 2,512 Non-trainable params: 0 _________________________________________________________________ Epoch 1/50 313/313 [==============================] - 0s 1ms/step - loss: 0.3483 Epoch 2/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0965 Epoch 3/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0624 Epoch 4/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0544 Epoch 5/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0515 Epoch 6/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0504 Epoch 7/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0499 Epoch 8/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0496 Epoch 9/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0495 Epoch 10/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0493 Epoch 11/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0493 Epoch 12/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0492 Epoch 13/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0491 Epoch 14/50 313/313 [==============================] - 0s 1ms/step - loss: 0.0491 Epoch 15/50 284/313 [==========================>...] - ETA: 0s - loss: 0.0490Restoring model weights from the end of the best epoch. 313/313 [==============================] - 0s 1ms/step - loss: 0.0490 Epoch 00015: early stopping
<tensorflow.python.keras.callbacks.History at 0x7f56042677b8>
For evaluating the results, a new test set of waveforms is randomly generated. Noisy signals are submitted to the network and their SNR is compared with the output SNR.
Both input and output signals' SNR are computed wrt. the original signal, without noise.
An average of the gain (i.e. increment in SNR) is computed across the batch of waveforms.
A sample of the output data is presented below, compared with the original signals.
testSet = genSinusoidBatch(SIGNAL_LENGHT, BATCH_SIZE, NOISE_FACTOR)
sins = testSet[0]
noisySins = testSet[1]
predictions = model.predict(noisySins)
def previewResults(sins, noisySins, predictions, n):
for _ in range(n):
x = random.randrange(sins.shape[0])
s = sins[x]
ns = noisySins[x]
p = predictions[x]
plt.figure()
plt.plot(noisySins[x], label='distorted signal')
plt.plot(sins[x], label='original signal')
plt.plot(predictions[x], label='predicted signal')
plt.legend()
initSNR = computeSNR(s, ns-s)
SNR = computeSNR(s, p-s)
plt.title('SNR went from %.3f dB to %.3f dB' % (initSNR, SNR))
def computeImprovement(sins, noisySins, predictions, batchSize = BATCH_SIZE):
improvedSNRs = []
for x in range(batchSize):
s = sins[x]
ns = noisySins[x]
p = predictions[x]
initSNR = computeSNR(s, ns-s)
SNR = computeSNR(s, p-s)
improvedSNRs.append(SNR-initSNR)
return np.average(np.array(improvedSNRs))
print("Average SNR improvement: %.3f dB" % computeImprovement(sins, noisySins, predictions))
previewResults(sins, noisySins, predictions, 5)
Average SNR improvement: 13.742 dB
We now proceed to retrain and test our network against a real dataset.
The dataset is a subset of DEEPSIG RADIOML 2018.01A available here:
#!wget https://github.com/alessandromartignano/RADIOML-2018.01A-SNR30/raw/master/DEEPSIG_2018_SNR30.hdf5.tar.gz
#!tar zxvf DEEPSIG_2018_SNR30.hdf5.tar.gz
import os.path
import requests
import tarfile
archiveName = 'DEEPSIG_2018_SNR30.hdf5.tar.gz'
datasetName = 'DEEPSIG_2018_SNR30.hdf5'
datasetUrl = 'https://github.com/alessandromartignano/RADIOML-2018.01A-SNR30/raw/master/DEEPSIG_2018_SNR30.hdf5.tar.gz'
if not os.path.isfile(archiveName):
print('Dataset not found. Downloading dataset...')
r = requests.get(datasetUrl)
with open(archiveName, 'wb') as f:
f.write(r.content)
if not os.path.isfile(datasetName):
print('Extracting dataset...')
tf = tarfile.open(archiveName)
tf.extractall()
tf.close()
Dataset not found. Downloading dataset... Extracting dataset...
from sklearn.model_selection import train_test_split
import h5py
with h5py.File("DEEPSIG_2018_SNR30.hdf5", "r") as f:
data = f['dataset'][:]
train_set ,test_set = train_test_split(data,test_size=0.3)
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def decompose(set):
signals = []
noisySignals = []
for x in set:
for chunk in list(chunks(x, 100))[:10]:
noisySignals.append(genNoise(chunk, NOISE_FACTOR))
signals.append(chunk)
return (np.array(signals), np.array(noisySignals))
signals, noisySignals = decompose(train_set)
model.fit(noisySignals,signals,callbacks=[monitor],verbose=1,epochs=50)
Epoch 1/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1898 Epoch 2/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1383 Epoch 3/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1315 Epoch 4/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1315 Epoch 5/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1311 Epoch 6/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1308 Epoch 7/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1310 Epoch 8/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1302 Epoch 9/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1302 Epoch 10/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1304 Epoch 11/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1301 Epoch 12/50 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1301 Epoch 13/50 5356/5376 [============================>.] - ETA: 0s - loss: 0.1302Restoring model weights from the end of the best epoch. 5376/5376 [==============================] - 7s 1ms/step - loss: 0.1302 Epoch 00013: early stopping
<tensorflow.python.keras.callbacks.History at 0x7f55f8e0e1d0>
signals, noisySignals = decompose(test_set)
predictions = model.predict(noisySignals)
def recompose(signals):
return signals.reshape((signals.shape[0]//10, 1000))
signals = recompose(signals)
noisySignals = recompose(noisySignals)
predictions = recompose(predictions)
print("Average SNR improvement: %.3f dB" % computeImprovement(signals, noisySignals, predictions, signals.shape[0]))
previewResults(signals, noisySignals, predictions, 5)
plt.show()
Average SNR improvement: 9.160 dB