#!/usr/bin/env python # coding: utf-8 # ## Experiment 3 - Filters - Initial tests # Refs [1]: https://stackoverflow.com/questions/39299838/how-do-i-import-module-in-jupyter-notebook-directory-into-notebooks-in-lower-dir # # [2] https://stackoverflow.com/questions/5364050/reloading-submodules-in-ipython # Firstly we will try to figure out a way to import our implemented modules, which is another directory [1]. And for auto reload of our modules [2]. The following method works and will be used here on # In[1]: import os, sys nb_dir = os.path.split(os.getcwd())[0] if nb_dir not in sys.path: sys.path.append(nb_dir) get_ipython().run_line_magic('matplotlib', 'inline') get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # In[2]: from directdemod import filters # ## Introduction # One of the most basic filter is a rolling window averaging filter. It takes window size of n, shifts it by 1 and calculates the average of the elements within. In the following example, we create a sinusoidal signal, add noise to it and pass it through a simple rolling window filter. The results are as follows: # In[3]: import matplotlib.pyplot as plt import numpy as np time = np.linspace(0, 3*np.pi, 500) signal = np.sin(time) noisy_signal = signal + 0.5 * np.random.randn(len(time)) plt.plot(time, signal) plt.plot(time, filters.rollingAverage(5).applyOn(noisy_signal)) plt.plot(time, filters.rollingAverage(25).applyOn(noisy_signal)) plt.plot(time, filters.rollingAverage(50).applyOn(noisy_signal)) plt.legend(["Original", "wSize = 5", "wSize = 25", "wSize = 50"]) # ### Observation 1 # A delay is observed in the filtered output, this is true for any filter and this must be kept in mind while using the designed filters. Hence our implementations should have a version by which we can get rid of this delay (phase error) # # ### Observation 2 # The initial conditions of the filter influence the initial outputs, this also must be kept in mind while using the filters # ## Chunking # When processing large signals, we will chunk the signal into several parts and pass them through the filter one after the other. For example let us consider the following array 'in' which is needed to be passed through the rolling filter. If we pass the whole thing we get, # In[4]: r = filters.rollingAverage(n = 2, storeState = False) print(r.applyOn(range(1,20))) # But if we break it down into pieces and pass it through the filter we get, # In[5]: r = filters.rollingAverage(n = 2, storeState = False) print(r.applyOn([1,2,3,4,5,6,7,8,9])) print(r.applyOn([10,11,12,13,14])) print(r.applyOn([15,16,17,18,19])) # Clearly when we combine the outputs, this does not equal the case when the whole thing was passed at once # ### Observation 3 # When the signal is chunked and passed through the filter, due to border effects the output is not the same. Hence a workaround must be implemented to prevent this. # # Solutions # # The above observed problems: # (a) delay or phase shift # (b) chunking issues # (c) initial conditions # # can be solved as follows: # ## A. Zero phase shift # # In case a zero shift is required by the filter, the filter object must be created with 'zeroPhase = True', from this the filter will try to provide zero shift. This is achieved by using scipys 'filtfilt' function rather than than the 'lfilter' # # The advantages: The phase shift will be zero # # The disadvantages: Cannot be used with chunking (i.e. do not process large signals with this), slower (since the filter is run forward and backwared to get zero shift), filter is applied twice # In[6]: time = np.linspace(0, 3*np.pi, 500) signal = np.sin(time) noisy_signal = signal + 0.5 * np.random.randn(len(time)) plt.plot(time, signal) plt.plot(time, filters.rollingAverage(5, zeroPhase = True).applyOn(noisy_signal)) plt.plot(time, filters.rollingAverage(25, zeroPhase = True).applyOn(noisy_signal)) plt.plot(time, filters.rollingAverage(50, zeroPhase = True).applyOn(noisy_signal)) plt.legend(["Original", "wSize = 5", "wSize = 25", "wSize = 50"]) # We observe that there is no phase shift due to the filter now # ## B. Chunking issues # # Chunking issues can be solved by storing the initial conditions in the filter object and updating it for ever chunk. While initializing the object use the flag 'storeState = True' to enable this. The following experiment proves that it works, # In[7]: r = filters.rollingAverage(n = 2, storeState = False) print(r.applyOn(range(1,20))) r = filters.rollingAverage(n = 2, storeState = True) print(r.applyOn([1,2,3,4,5,6,7,8,9])) print(r.applyOn([10,11,12,13,14])) print(r.applyOn([15,16,17,18,19])) # We can clearly observe that the border issues have now been resolved # ## C. Initial output # # The initial output problem can be corrected by creating an initial condition in such a way that the initial output conditions are met. If we want the filter to start at say -6, we pass the flag 'initOut = [-6]', the following experiment shows the result, # In[8]: time = np.linspace(0, 3*np.pi, 500) signal = np.sin(time) noisy_signal = signal + 0.5 * np.random.randn(len(time)) plt.plot(time, signal) plt.plot(time, filters.rollingAverage(5, initOut = [-6]*5).applyOn(noisy_signal)) plt.plot(time, filters.rollingAverage(25, initOut = [-6]*25).applyOn(noisy_signal)) plt.plot(time, filters.rollingAverage(50, initOut = [-6]*50).applyOn(noisy_signal)) plt.legend(["Original", "wSize = 5", "wSize = 25", "wSize = 50"]) # This is how we can set the initial conditions of the filter # ### Conclusions # Hence with this we conclude that we can solve the discussed problems: (a) delay or phase shift (b) chunking issues (c) initial conditions with the solutions implemented