This notebook outlines the steps required to perform changepoint analysis on a set of data using the PELT algorithm. The PELT algorithm is included in the NAG Library and we will use NAG's Python Wrappers to access the algorithm.
The particular example examines the Volkswagen stock price downloaded from Yahoo's historic stock prices.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import nag4py.util as util
import nag4py.e01 as e01
import nag4py.g13 as g13
%matplotlib inline
stock_name = 'volkswagen'
data = pd.read_csv('{}.csv'.format(stock_name))
data['Date'] = pd.to_datetime(data['Date'], format="%d/%m/%Y")
data = data.set_index('Date')
The PELT algorithm assumes that the data is univariate (evenly spaced).
The stock prices have gaps for holidays and other reasons.
Therefore, the gaps must be filled, by interpolated data, before performing the change point analysis.
known_dates = data.index.sort_values()
price_type = 'High'
known_prices = getattr(data, price_type).sort_index().values
all_dates = pd.date_range(known_dates[0], known_dates[-1])
all_data = data.reindex(all_dates)
missing_dates = all_dates.difference(known_dates)
def mono_inter(x, y, missing_dates):
"""This function calcuates the missing prices from the known dates, known prices and missing dates.
x is the known dates
y is the known prices
md is the missing dates"""
d = np.zeros(len(x))
fail_int = util.noisy_fail()
e01.nag_monotonic_interpolant(len(x), x, y, d, fail_int)
missing_prices = np.zeros(len(missing_dates))
fail_eva = util.noisy_fail()
e01.nag_monotonic_evaluate(len(x), x, y, d, len(missing_dates), missing_dates, missing_prices, fail_eva)
if fail_eva.iflag != 0:
print(fail_eva.message)
return missing_prices
missing_prices = mono_inter(known_dates.values.astype(float), known_prices, missing_dates.values.astype(float))
missing_prices_list = missing_prices.tolist()
for row in all_data.iterrows():
if pd.isnull(row[1][price_type]):
all_data.loc[row[0], price_type] = missing_prices_list.pop(0)
fig = plt.figure()
fig.set_size_inches(16, 10)
plt.plot(all_data.index, all_data[price_type], 'g', label = 'Volkswagen')
plt.title('Volkswagen Stock Price with Interpolated Values')
plt.legend(loc = 2)
plt.ylabel('Euro '+u"(\u20AC)")
plt.xlabel('Date')
plt.show()
start = 4000
end = 4100
s = np.where(all_dates==known_dates[start])[0][0]
e = np.where(all_dates==known_dates[end])[0][0]
fig = plt.figure()
fig.set_size_inches(16, 10)
plt.plot(all_data.index[s:e], all_data[price_type][s:e], 'gs-', label = 'Interpolated')
plt.plot(known_dates[start:end], known_prices[start:end], 'rs-', label = 'Known Values')
plt.title('Volkswagen Stock Price with Interpolated Values and Known Values')
plt.legend(loc = 2)
plt.ylabel('Euro '+u"(\u20AC)")
plt.xlabel('Date')
plt.show()
dist = util.Nag_TS_ChangeType()
dist_change_point_type_dict = {'Nag_NormalMean': util.Nag_NormalMean,
'Nag_NormalStd': util.Nag_NormalStd,
'Nag_NormalMeanStd': util.Nag_NormalMeanStd,
'Nag_GammaScale': util.Nag_GammaScale,
'Nag_ExponentialLambda': util.Nag_ExponentialLambda,
'Nag_PoissonLambda': util.Nag_PoissonLambda}
dist_change_point_type = 'Nag_NormalMean'
dist = dist_change_point_type_dict[dist_change_point_type]
def penalty_term(dist_change_point_type, time_series_len, penalty_method = 'SIC'):
if dist_change_point_type == 'Nag_NormalMeanStd':
p = 2
else:
p = 1
penalty_method_dict = {'SIC': p * np.log(time_series_len),
'BIC': p * np.log(time_series_len),
'AIC': p * 2,
'Hannan-Quinn': 2 * p * np.log(np.log(time_series_len))}
return penalty_method_dict[penalty_method]
n = len(all_data[price_type])
m = n
std = np.zeros(1, dtype=np.float64)
mean = np.zeros(1, dtype=np.float64)
fail = util.noisy_fail()
g13.nag_tsa_mean_range(len(all_data[price_type]),
np.array(all_data[price_type].values),
m, util.Nag_UseSD, std, mean, fail)
if fail.iflag != 0:
print(fail.message)
beta = penalty_term(dist_change_point_type, n, 'SIC')
print('Pentalty Term', beta)
minss = 2
param = np.ndarray(1)
if dist_change_point_type == 'Nag_NormalMean':
param[0] = std[0]
elif dist_change_point_type == 'Nag_NormalMeanStd':
param[0] = mean[0]
else:
param[0] = 0
ntau = np.zeros(n, dtype=util.nag_int_type)
tau = np.zeros(n, dtype=util.nag_int_type)
sparam = np.zeros(2*n+2)
fail = util.noisy_fail()
g13.nag_tsa_cp_pelt(dist, n, np.array(all_data[price_type].values), beta, minss, param, ntau, tau, sparam, fail)
if fail.iflag != 0:
print(fail.message)
Pentalty Term 8.93115542978
m = ntau[0]
change_points = [0] + tau[0:m-1].tolist()
mean = []
std = []
for i in range(0, m):
mean.append(sparam[2*(i+1)-2])
std.append(sparam[2*(i+1)-1])
#This is to plot the final mean bar
change_points.append(len(all_data[price_type].values)-1)
std.append(std[-1])
mean.append(mean[-1])
fig = plt.figure()
fig.set_size_inches(16, 10)
plt.plot(all_dates, all_data[price_type].values, 'g', label = 'Stock Price - {}'.format(price_type))
for change_point in change_points:
plt.axvline(all_dates[change_point], color = 'r')
plt.axvline(all_dates[change_point], color = 'r', label = 'Changepoints')
plt.step(all_dates[change_points], mean, where='post', label = 'Mean')
plt.title('Volkswagen Stock Price with Interpolated Values and Changepoints')
plt.legend()
plt.ylabel('Euro '+u"(\u20AC)")
plt.xlabel('Date')
plt.show()
fig = plt.figure()
fig.set_size_inches(16, 10)
plt.plot(all_dates[change_points[-3]:], all_data[price_type].values[change_points[-3]:], 'g', label = 'Stock Price - {}'.format(price_type))
for change_point in change_points[-3:]:
plt.axvline(all_dates[change_point], color = 'r')
plt.axvline(all_dates[change_point], color = 'r', label = 'Changepoints')
plt.step(all_dates[change_points][-3:], mean[-3:], where='post', label='Mean')
plt.title('Volkswagen Stock Price with Interpolated Values and Changepoints')
plt.legend()
plt.ylabel('Euro '+u"(\u20AC)")
plt.xlabel('Date')
plt.show()
print("The change point occurs on the {}.".format(all_dates[change_points][-3:][1]))
The change point occurs on the 2015-09-21 00:00:00.