# Importing the libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
plt.style.use('ggplot')
import scipy.stats
class simpleAnomalyDetector():
"""
USAGE
calorieIntake = np.array([
[[100,20000],[300,400],[500,500]],
[[200,200],[200,200],[500,600]],
[[100,0],[100,0],[500,600]],
[[1000,0],[1000,0],[2000,0]],
[[100,100],[100,100],[500,500]],
[[200,200],[200,200],[500,500]],
[[400,300],[200,100],[500,500]]
])
print("\n\nToy data: calorieIntake")
print(calorieIntake)
# Learn Model Parameters
model = simpleAnomalyDetector()
model.fit(calorieIntake)
# Show Data
print("\n\n# Learn Model Parameters \n# Show Data")
print(model.df)
# Create a test case
print("\n\n# Create a test case")
x_test = np.array([[1000,400],[200,200],[500,500]])
print(x_test)
# For a given time r and event c, show test value
r, c = 0,1
val = x_test[r, c]
print("\n\n# For a given time r = {} and event c = {}, show test value".format(r,c))
print(val)
# Previously known data on that time r and event c
feature_size = x_test.shape[1]
idx = r*feature_size + c
print("\n\n# Previously known data on that time r = {} and event c= {}".format(r,c))
print("# Look column {} of the dataframe".format(idx))
print(model.df[idx])
## Normal
x_test[r,c] = 500
print("\n\nPrediction for {}".format(x_test[r,c]))
print(model.predict(test_matrix = x_test, row = r, col= c))
## Normal
x_test[r,c] = 5000
print("\n\nPrediction for {}".format(x_test[r,c]))
print(model.predict(test_matrix = x_test, row = r, col= c))
## Anomaly
x_test[r,c] = 50000
print("\n\nPrediction for {}".format(x_test[r,c]))
print(model.predict(test_matrix = x_test, row = r, col= c))
"""
def fit(self, data):
"""Read 3 dimensional array, first dimension is day, sample * features"""
data = data.flatten().reshape(data.shape[0], data.shape[1] * data.shape[2])
self.df = pd.DataFrame(data)
# Learn parameters - central tendency and stdandard deviation
self.means = self.df.mean()
self.stds = self.df.std()
def predict(self, test_matrix, row, col):
feature_size = test_matrix.shape[1]
idx = row*feature_size + col
mu = self.means[idx]
sigma = self.stds[idx]
val = test_matrix[row,col]
return np.abs(val - mu) > 3 * sigma, mu, sigma
calorieIntake = np.array([
[[100,20000],[300,400],[500,500]],
[[200,200],[200,200],[500,600]],
[[100,0],[100,0],[500,600]],
[[1000,0],[1000,0],[2000,0]],
[[100,100],[100,100],[500,500]],
[[200,200],[200,200],[500,500]],
[[400,300],[200,100],[500,500]]
])
print("\n\nToy data: calorieIntake")
print(calorieIntake)
# Learn Model Parameters
model = simpleAnomalyDetector()
model.fit(calorieIntake)
# Show Data
print("\n\n# Learn Model Parameters \n# Show Data")
print(model.df)
# Create a test case
print("\n\n# Create a test case")
x_test = np.array([[1000,400],[200,200],[500,500]])
print(x_test)
# For a given time r and event c, show test value
r, c = 0,1
val = x_test[r, c]
print("\n\n# For a given time r = {} and event c = {}, show test value".format(r,c))
print(val)
# Previously known data on that time r and event c
feature_size = x_test.shape[1]
idx = r*feature_size + c
print("\n\n# Previously known data on that time r = {} and event c= {}".format(r,c))
print("# Look column {} of the dataframe".format(idx))
print(model.df[idx])
## Normal
x_test[r,c] = 500
print("\n\nPrediction for {}".format(x_test[r,c]))
print(model.predict(test_matrix = x_test, row = r, col= c))
## Normal
x_test[r,c] = 5000
print("\n\nPrediction for {}".format(x_test[r,c]))
print(model.predict(test_matrix = x_test, row = r, col= c))
## Anomaly
x_test[r,c] = 50000
print("\n\nPrediction for {}".format(x_test[r,c]))
print(model.predict(test_matrix = x_test, row = r, col= c))
Toy data: calorieIntake [[[ 100 20000] [ 300 400] [ 500 500]] [[ 200 200] [ 200 200] [ 500 600]] [[ 100 0] [ 100 0] [ 500 600]] [[ 1000 0] [ 1000 0] [ 2000 0]] [[ 100 100] [ 100 100] [ 500 500]] [[ 200 200] [ 200 200] [ 500 500]] [[ 400 300] [ 200 100] [ 500 500]]] # Learn Model Parameters # Show Data 0 1 2 3 4 5 0 100 20000 300 400 500 500 1 200 200 200 200 500 600 2 100 0 100 0 500 600 3 1000 0 1000 0 2000 0 4 100 100 100 100 500 500 5 200 200 200 200 500 500 6 400 300 200 100 500 500 # Create a test case [[1000 400] [ 200 200] [ 500 500]] # For a given time r = 0 and event c = 1, show test value 400 # Previously known data on that time r = 0 and event c= 1 # Look column 1 of the dataframe 0 20000 1 200 2 0 3 0 4 100 5 200 6 300 Name: 1, dtype: int64 Prediction for 500 (False, 2971.4285714285716, 7509.7080026932) Prediction for 5000 (False, 2971.4285714285716, 7509.7080026932) Prediction for 50000 (True, 2971.4285714285716, 7509.7080026932)
fig = plt.figure()
fig.set_size_inches(3,2)
sns.heatmap(model.means.values.reshape(*calorieIntake[0].shape))
<matplotlib.axes._subplots.AxesSubplot at 0x1a19f8f588>
# Previously known data on that time r and event c
feature_size = x_test.shape[1]
idx = r*feature_size + c
print("\n\n# Previously known data on that time r = {} and event c= {}".format(r,c))
print("# Look column {} of the dataframe".format(idx))
fig = plt.figure()
fig.set_size_inches(10,5)
model.df[idx].plot()
plt.scatter(model.df.index, model.df[idx])
for i in range(4):
plt.axhline(y=model.means[idx] - i * model.stds[idx], linewidth=4-i, color='b')
for i in range(4):
plt.axhline(y=model.means[idx] + i * model.stds[idx], linewidth=4-i, color='b')
plt.xlabel('Days')
plt.ylabel('Count for Time r = {} and event c= {} '.format(r,c))
plt.ylim(model.means[idx] - 4 * model.stds[idx], model.means[idx] + 4 * model.stds[idx])
# Previously known data on that time r = 0 and event c= 1 # Look column 1 of the dataframe
(-27067.403439344227, 33010.26058220137)