Implementar uma Rede Neural Recorrente que aprenda $y_{t} = sin(y_{t-1}) + cos(y_{t-4}) + u_{t}$, onde $u_{t} \sim N\left(0, 0.09\right)$.
Importamos os pacotes que serão utilizados e configuramos o matplotlib.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import optimize
import matplotlib
%matplotlib inline
matplotlib.rcParams['figure.figsize'] = (14, 8)
Fixamos o seed para manter os resultados entre as execuções e definimos a função que gera $u_{t}$.
np.random.seed(seed=1)
sigma = 0.09
def noise(m = 0):
return np.random.normal(m, sigma)
Definimos o número de amostras e o número de elementos na sequência para o treinamento e para a validação da rede neural.
n_of_samples = 100000 # number of samples
n_per_sample = 6 # number per sample
Criamos duas sequências: Uma para treinamento e outra para validação posterior da rede (validação cruzada).
def next(y_1, y_4):
return np.sin(y_1) + np.cos(y_4) + noise()
_x = [noise() for _ in range(4)] # Treinamento
_x_val = [noise(np.pi/4) for _ in range(4)] # Validação
for i in range(n_per_sample + n_of_samples):
_x.append(next(_x[-1], _x[-4]))
_x_val.append(next(_x_val[-1], _x_val[-4]))
# Retiramos o estado transiente das séries
_x = _x[4:]
_x_val = _x_val[4:]
Os dados para treinamento e validação serão armezenados nas variáveis "X" e "X_val", respectivamente.
X = np.zeros((n_of_samples, n_per_sample))
X_val = np.zeros((n_of_samples, n_per_sample))
target = np.zeros(n_of_samples)
target_val = np.zeros(n_of_samples)
for row in range(n_of_samples):
X[row, :] = _x[row: row + n_per_sample]
target[row] = _x[row + n_per_sample]
X_val[row, :] = _x_val[row: row + n_per_sample]
target_val[row] = _x_val[row + n_per_sample]
Definimos a função de ativação utilizada pela rede, bem como a derivada da função de ativação.
Para a nossa rede, escolhemos a função $tanh(z)$.
def g(z):
return np.tanh(z)
def dg(z):
return 1 - np.tanh(z) ** 2
Definimos os métodos para a evolução dos estados da rede neural.
É importante destacar que o último estágio não deverá passar pela função de ativação para não restringir nosso intervalo de saída.
Definimos $S_{0}=0$.
def forward_nn(X, W, U):
m, n = X.shape[0], X.shape[1]
S = np.zeros((m, n+1))
for k in range(n-1): # state[0] inicia com 0
S[:, k+1] = g(X[:, k] * W[k] + S[:, k] * U)
S[:, n] = X[:, n-1] * W[n-1] + S[:, n-1] * U # Último estado não ativa
return S
def output_nn(S):
return S[:, -1]
Definimos os métodos para calcular a função custo e o gradiente do custo.
$$cost (y, target) = \frac{\sum (y - target)^{2}}{n} $$$$\frac{\partial cost}{\partial y} ( y, target ) = \frac{2 * (y - target)}{n}$$def cost(y, target):
return np.sum((y - target) ** 2) / len(y)
def gradient_cost(y, target):
return 2 * (y - target) / len(y)
Definimos o método que calcula o BPTT - Backpropagation Through Time.
def back_propagation(X, target, S, W, U):
m = X.shape[0]
n = X.shape[1]
y = output_nn(S)
grad_s = np.zeros((m, n+1))
grad_s[:, -1] = gradient_cost(y, target)
grad_w = np.zeros(n)
grad_u = 0
for k in range(n):
_x = X[:, n-k-1]
_s = S[:, n-k-1]
if k == 0:
_g = 1 # O último estado não tem ativação
else:
_g = dg(_x * W[n-k-1] + _s * U)
grad_s[:, n-k-1] = grad_s[:, n-k] * U * _g
grad_w[n-k-1] = np.sum(grad_s[:, n-k] * _x * _g)
grad_u += np.sum(grad_s[:, n-k] * _s * _g)
return grad_s, grad_w, grad_u
Definimos as funções para o treinamento da rede utilizando otimização (Nonlinear Conjugate Gradient Algorithm).
# Função custo com parâmetros w, u
def f(wu):
w = wu[0:-1]
u = wu[-1]
y = output_nn(forward_nn(X, w, u))
return cost(y, target)
# Gradiente da função custo em relação aos parâmetros w, u (Utilizando Backpropagation)
def fprime(wu):
w = wu[0:-1]
u = wu[-1]
S = forward_nn(X, w, u)
_, grad_w, grad_u = back_propagation(X, target, S, w, u)
return np.append(grad_w, grad_u)
def train(X, W_initial = [noise() for _ in range(n_per_sample)], U_initial = noise()):
wu = np.append(W_initial, U_initial)
return optimize.fmin_cg(f, wu, fprime=fprime)
Faremos a checagem numérica para verificar se o cálculo do gradiente utilizando o BPTT está correto.
Assim, esperamos que o erro seja um número próximo de zero.
print("Erro: {:.2}".format(optimize.check_grad(f, fprime, [1 for _ in range(n_per_sample + 1)])))
Erro: 6e-08
Com a checagem indicando que os cálculos estão corretos, faremos o treinamento da rede para saber os parâmetros W e U que minimizam nossa função de custo.
t = train(X)
w_opt = t[0:-1]
u_opt = t[-1]
print("\nW: {} \nU: {}".format(w_opt, u_opt))
Optimization terminated successfully. Current function value: 0.039141 Iterations: 107 Function evaluations: 200 Gradient evaluations: 200 W: [ 0.13641299 0.44448911 -0.61303398 0.244305 -0.65330933 1.32986464] U: 1.1895773760817256
Com os parâmetros W e U calculados pela otimização, faremos a evolução dos dados com as sequências de treinamento e de validação (aquelas que não foram utilizadas no treinamento).
y_opt = output_nn(forward_nn(X, w_opt, u_opt))
y_val = output_nn(forward_nn(X_val, w_opt, u_opt))
Comparação da função custo (média do quadrado dos erros) e coeficiente de determinação.
def r2_score(y, target):
m = np.average(target)
return 1 - (np.sum((y - target) ** 2) / np.sum((m - target) ** 2))
print('Training')
print(' Cost: {:.3}' .format(cost(y_opt, target)))
print(' R\u00b2: {:.1f}%' .format(100 * r2_score(y_opt, target)))
print('Validation')
print(' Cost: {:.3}' .format(cost(y_val, target_val)))
print(' R\u00b2: {:.1f}%' .format(100 * r2_score(y_val, target_val)))
Training Cost: 0.0391 R²: 93.1% Validation Cost: 0.0393 R²: 93.1%
Vemos que o custo final da amostra de validação foi próximo ao da amostra de treinamento. Isso sugere que a rede conseguiu aprender sem fazer overfitting.
df = pd.DataFrame()
df['Target'] = target_val[0:100]
df['Predicted'] = y_val[0:100]
df.plot()
plt.show()
def scatterPlot(actual, predicted):
plt.scatter(actual, predicted)
range = np.array([actual.min(), actual.max()])
plt.plot(range, range, 'white')
plt.plot(range, range + sigma, 'orange')
plt.plot(range, range - sigma, 'orange')
plt.xlabel("Target")
plt.ylabel("Predicted")
plt.show()
scatterPlot(target_val, y_val)
Verificação do quadrado do erro ao longo da sequência de validação:
df = pd.DataFrame()
df['Erro\u00b2'] = ((y_val - target_val)**2)
axes = df.plot()
plt.show()
Verificaremos agora o poder da rede de prever os demais valores da sequência, comparando com uma seqência gerada através da regra $y_{t} = sin(y_{t-1}) + cos(y_{t-4}) + u_{t}$
y_real = [noise(np.pi/2) for _ in range(n_per_sample)]
y_rede_neural = y_real.copy()
for i in range(90):
y_real.append(next(y_real[-1], y_real[-4]))
l = np.asmatrix(y_rede_neural[i: i + n_per_sample])
out = output_nn(forward_nn(l, w_opt, u_opt)) # calcula o próximo y utilizando a rede neural
y_rede_neural.append(out[0])
df = pd.DataFrame()
df['y_real'] = y_real
df['y_rede_neural'] = y_rede_neural
df.plot()
plt.show()