In the previous lectures, we learned that:
import sys
# Install dependencies if the notebook is running in Colab
if 'google.colab' in sys.modules:
!pip install -U -qq tsa-course
# Imports
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import requests
from io import BytesIO
from tsa_course.lecture2 import run_sequence_plot
np.random.seed(0) # reproducibility
run_sequence_plot
function to visualize our data.# Generate stationary data
time = np.arange(100)
stationary = np.random.normal(loc=0, scale=1.0, size=len(time))
run_sequence_plot(time, stationary, title="Stationary time series");
# find mean of series
stationary_time_series_avg = np.mean(stationary)
# create array composed of mean value and equal to length of time array
sts_avg = np.full(shape=len(time), fill_value=stationary_time_series_avg, dtype='float')
ax = run_sequence_plot(time, stationary, title="Stationary Data")
ax.plot(time, sts_avg, 'tab:red', label="mean")
plt.legend();
Example
def mse(observations, estimates):
"""
INPUT:
observations - numpy array of values indicating observed values
estimates - numpy array of values indicating an estimate of values
OUTPUT:
Mean Square Error value
"""
# check arg types
assert type(observations) == type(np.array([])), "'observations' must be a numpy array"
assert type(estimates) == type(np.array([])), "'estimates' must be a numpy array"
# check length of arrays equal
assert len(observations) == len(estimates), "Arrays must be of equal length"
# calculations
difference = observations - estimates
sq_diff = difference ** 2
mse = np.mean(sq_diff)
return mse
Let's test the mse
function.
zeros = mse(np.array([0, 1, 3, 2]), np.array([1, 1, 2, 4]))
print(zeros)
1.5
ones = mse(np.array([0, 1, 3, 2]), np.array([0, 0, 1, 0]))
print(ones)
2.25
trend = (time * 2.0) + stationary
run_sequence_plot(time, trend, title="Time series with trend");
# find mean of series
trend_time_series_avg = np.mean(trend)
# create array of mean value equal to length of time array
avg_trend = np.full(shape=len(time), fill_value=trend_time_series_avg, dtype='float')
run_sequence_plot(time, trend, title="Time series with trend")
plt.plot(time, avg_trend, 'tab:red', label="mean")
plt.legend();
def moving_average(observations, window=3, forecast=False):
cumulative_sum = np.cumsum(observations, dtype=float)
cumulative_sum[window:] = cumulative_sum[window:] - cumulative_sum[:-window]
ma = cumulative_sum[window - 1:] / window
if forecast:
observations = np.append(observations, np.nan)
ma_forecast = np.insert(ma, 0, np.nan*np.ones(window))
return observations, ma_forecast
else:
return ma
MA_trend = moving_average(trend, window=3)
print(f"MSE:\n--------\nsimple average: {mse(trend, avg_trend):.2f}\nmoving_average: {mse(trend[2:], MA_trend):.2f}")
MSE: -------- simple average: 3324.01 moving_average: 4.57
run_sequence_plot(time, trend, title="Time series with trend")
plt.plot(time[1:-1], MA_trend, 'tab:red', label="MA")
plt.legend();
x = np.arange(20)
print(len(moving_average(x, window=9)))
12
seasonality = 10 + np.sin(time) * 10 + stationary
MA_seasonality = moving_average(seasonality, window=3)
run_sequence_plot(time, seasonality, title="Seasonality")
plt.plot(time[1:-1], MA_seasonality, 'tab:red', label="MA")
plt.legend(loc='upper left');
trend_seasonality = trend + seasonality + stationary
MA_trend_seasonality = moving_average(trend_seasonality, window=3)
run_sequence_plot(time, trend_seasonality, title="Trend, seasonality, and noise")
plt.plot(time[1:-1], MA_trend_seasonality, 'tab:red', label="MA")
plt.legend(loc='upper left');
📝Note
x = np.array([1, 2, 4, 8, 16, 32, 64])
ma_x, ma_forecast = moving_average(x, window=3, forecast=True)
t = np.arange(len(ma_x))
run_sequence_plot(t[:-4], ma_x[:-4], title="Nonlinear data")
plt.plot(t[-5:], ma_x[-5:], 'k', label="True values", linestyle='--')
plt.plot(t, ma_forecast, 'tab:red', label="MA forecast", linestyle='--')
plt.legend(loc='upper left');
noisy_noise = np.random.normal(loc=0, scale=8.0, size=len(time))
noisy_trend = time * 2.75
noisy_seasonality = 10 + np.sin(time * 0.25) * 20
noisy_data = noisy_trend + noisy_seasonality + noisy_noise
run_sequence_plot(time, noisy_data, title="Noisy data");
# Compute MA with different window sizes
lag_2 = moving_average(noisy_data, window=3)
lag_3 = moving_average(noisy_data, window=5)
lag_5 = moving_average(noisy_data, window=9)
lag_10 = moving_average(noisy_data, window=19)
_, axes = plt.subplots(2,2, figsize=(11,6))
axes[0,0] = run_sequence_plot(time, noisy_data, title="MA with lag 2", ax=axes[0,0])
axes[0,0].plot(time[1:-1], lag_2, color='tab:blue', linewidth=2.5)
axes[0,1] = run_sequence_plot(time, noisy_data, title="MA with lag 3", ax=axes[0,1])
axes[0,1].plot(time[2:-2], lag_3, color='tab:orange', linewidth=2.5)
axes[1,0] = run_sequence_plot(time, noisy_data, title="MA with lag 5", ax=axes[1,0])
axes[1,0].plot(time[4:-4], lag_5, color='tab:green', linewidth=2.5)
axes[1,1] = run_sequence_plot(time, noisy_data, title="MA with lag 10", ax=axes[1,1])
axes[1,1].plot(time[9:-9], lag_10, color='tab:red', linewidth=2.5)
plt.tight_layout();
def weighted_moving_average(observations, weights, forecast=False):
if len(weights) != len(observations[0:len(weights)]):
raise ValueError("Length of weights must match the window size")
# Normalize weights
weights = np.array(weights) / np.sum(weights)
# Initialize the result array
result = np.empty(len(observations)-len(weights)//2)
result[:] = np.nan
# Calculate weighted moving average
for i in range(len(weights)//2, len(result)):
window = observations[i-(len(weights)//2):i+len(weights)//2+1]
result[i] = np.dot(window, weights)
# Handle forecast option
if forecast:
result = np.insert(result, 0, np.nan*np.ones(len(weights)//2+1))
observations = np.append(observations, np.nan*np.ones(len(weights)//2))
return observations, result
else:
return result
[0.160, 0.294, 0.543]
.weights = np.array([0.160, 0.294, 0.543])
wma_x, wma_forecast = weighted_moving_average(x, weights, forecast=True)
t = np.arange(len(wma_x))
run_sequence_plot(t[:-4], ma_x[:-4], title="Nonlinear data")
plt.plot(t[-5:], ma_x[-5:], 'k', label="True values", linestyle='--')
plt.plot(t, ma_forecast, 'tab:red', label="MA forecast", linestyle='--')
plt.plot(t, wma_forecast, 'tab:blue', label="WMA forecast", linestyle='--')
plt.legend(loc='upper left');
There are three key exponential smoothing techniques:
Type | Capture trend | Capture seasonality |
---|---|---|
Single Exponential Smoothing | ❌ | ❌ |
Double Exponential Smoothing | ✅ | ❌ |
Triple Exponential Smoothing | ✅ | ✅ |
Expanding the formula we get:
S(0)=αX(0)S(1)=αX(1)+(1−α)S(0)=αX(1)+α(1−α)X(0)S(2)=αX(2)+(1−α)S(1)=αX(2)+(1−α)[αX(1)+α(1−α)X(0)]=αX(2)+α(1−α)X(1)+α(1−α)2X(0)The formula for a generic time step τ is:
S(τ)=τ∑t=0α(1−α)τ−tX(t)statsmodels
.statsmodels
.Initialization
Forecasts beyond 1-step ahead
Additive seasonality model
Multiplicative seasonality model
Initialization
statsmodels
to do the heavy lifting for us.# Train/test split
train = trend_seasonality[:-5]
test = trend_seasonality[-5:]
# find mean of series
trend_seasonal_avg = np.mean(trend_seasonality)
# create array of mean value equal to length of time array
simple_avg_preds = np.full(shape=len(test), fill_value=trend_seasonal_avg, dtype='float')
# mse
simple_mse = mse(test, simple_avg_preds)
# results
print("Predictions: ", simple_avg_preds)
print("MSE: ", simple_mse)
Predictions: [109.21734351 109.21734351 109.21734351 109.21734351 109.21734351] MSE: 9549.121874364564
ax = run_sequence_plot(time[:-5], train, title=f"Simple average - MSE: {simple_mse:.2f}")
ax.plot(time[-5:], test, color='tab:blue', linestyle="--", label="test")
ax.plot(time[-5:], simple_avg_preds, color='tab:orange', linestyle="--", label="preds");
from statsmodels.tsa.api import SimpleExpSmoothing
single = SimpleExpSmoothing(train).fit(optimized=True)
single_preds = single.forecast(len(test))
single_mse = mse(test, single_preds)
print("Predictions: ", single_preds)
print("MSE: ", single_mse)
Predictions: [196.61657923 196.61657923 196.61657923 196.61657923 196.61657923] MSE: 136.2446911548019
ax = run_sequence_plot(time[:-5], train, title=f"Simple exponential smoothing - MSE: {single_mse:.2f}")
ax.plot(time[-5:], test, color='tab:blue', linestyle="--", label="test")
ax.plot(time[-5:], single_preds, color='tab:orange', linestyle="--", label="preds");
from statsmodels.tsa.api import Holt
double = Holt(train).fit(optimized=True)
double_preds = double.forecast(len(test))
double_mse = mse(test, double_preds)
print("Predictions: ", double_preds)
print("MSE: ", double_mse)
Predictions: [198.62600614 200.63543303 202.64485991 204.6542868 206.66371368] MSE: 82.95678016291609
ax = run_sequence_plot(time[:-5], train, title=f"Double exponential smoothing - MSE: {double_mse:.2f}")
ax.plot(time[-5:], test, color='tab:blue', linestyle="--", label="test")
ax.plot(time[-5:], double_preds, color='tab:orange', linestyle="--", label="preds");
from statsmodels.tsa.api import ExponentialSmoothing
triple = ExponentialSmoothing(train,
trend="additive",
seasonal="additive",
seasonal_periods=13).fit(optimized=True)
triple_preds = triple.forecast(len(test))
triple_mse = mse(test, triple_preds)
print("Predictions: ", triple_preds)
print("MSE: ", triple_mse)
Predictions: [204.468726 207.03255819 215.74668895 209.88430558 202.24064983] MSE: 28.93848905442322
ax = run_sequence_plot(time[:-5], train, title=f"Triple exponential smoothing - MSE: {triple_mse:.2f}")
ax.plot(time[-5:], test, color='tab:blue', linestyle="--", label="test")
ax.plot(time[-5:], triple_preds, color='tab:orange', linestyle="--", label="preds");
data_dict = {'MSE':[simple_mse, single_mse, double_mse, triple_mse]}
df = pd.DataFrame(data_dict, index=['simple', 'single', 'double', 'triple'])
print(df)
MSE simple 9549.121874 single 136.244691 double 82.956780 triple 28.938489
In this lecture we learned
Load the following two time series.
# Load the first time series
response = requests.get("https://zenodo.org/records/10897398/files/smoothing_ts1.npy?download=1")
response.raise_for_status()
smoothing_ts1 = np.load(BytesIO(response.content))
print(len(smoothing_ts1))
# Load the second time series
response = requests.get("https://zenodo.org/records/10897398/files/smoothing_ts4_4.npy?download=1")
response.raise_for_status()
smoothing_ts2 = np.load(BytesIO(response.content))
print(len(smoothing_ts2))
144 1000
Using on what you learned in this and in the previous lectures, do the following.
mytime1
and mytime2
that starts at 0 and are as long as each dataset.