#!/usr/bin/env python # coding: utf-8 # In[1]: import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import numpy as np import math import matplotlib.pyplot as plt import pandas as pd get_ipython().run_line_magic('matplotlib', 'inline') print(tf.__version__) # In[2]: def drawErrorValues(epoch_list, train_error_value_list, validation_error_value_list, test_error_value_list): fig = plt.figure(figsize=(20, 5)) plt.subplot(121) plt.plot(epoch_list, train_error_value_list, 'r', label='Train') plt.plot(epoch_list, validation_error_value_list, 'g', label='Validation') plt.ylabel('Total Error') plt.xlabel('Epochs') plt.grid(True) plt.legend(loc='upper right') plt.subplot(122) plt.plot(epoch_list, test_error_value_list, 'b', label='Test') plt.ylabel('Accuracy') plt.xlabel('Epochs') plt.yticks(np.arange(min(test_error_value_list), max(test_error_value_list), 100)) plt.grid(True) plt.legend(loc='lower right') plt.show() # In[3]: class RNN: def __init__(self, batch_size, n_steps, n_inputs, n_state_units, n_classes): self.batch_size = batch_size self.n_steps = n_steps self.n_inputs = n_inputs self.n_state_units = n_state_units self.n_classes = n_classes def set_data(self, train_data, val_data, test_data): self.train = train_data self.validation = val_data self.test = test_data def make_rnn(self, learning_rate): self.x = tf.placeholder(tf.float32, [None, self.n_steps, self.n_inputs]) xt = tf.transpose(self.x, perm = [1, 0, 2]) xr = tf.reshape(xt, shape = [-1, self.n_inputs]) xs = tf.split(value=xr, num_or_size_splits=self.n_steps, axis=0) self.y = tf.placeholder(tf.float32, [None, n_classes]) self.real_batch_size = tf.placeholder(tf.int32) rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units = n_state_units) initial_state = rnn_cell.zero_state(batch_size=self.real_batch_size, dtype=tf.float32) self.outputs, self.final_state = tf.contrib.rnn.static_rnn(rnn_cell, xs, initial_state=initial_state, dtype=tf.float32) self.W2 = tf.Variable(tf.random_normal([self.n_state_units, self.n_classes])) self.B = tf.Variable(tf.random_normal([self.n_classes])) self.pred = tf.matmul(self.outputs[-1], self.W2) + self.B self.error = tf.reduce_sum(tf.square(self.pred - self.y), axis=0) self.optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate).minimize(self.error) def learning(self, max_training_epochs): epoch_list = [] train_error_value_list = [] validation_error_value_list = [] test_error_value_list = [] test_accuracy_list = [] init = tf.global_variables_initializer() with tf.Session() as sess: sess.run(init) print("batch_size", self.batch_size) total_batch = int(math.ceil(len(self.train['x']) / float(self.batch_size))) print("Total batch: {0}".format(total_batch)) for epoch in range(max_training_epochs): for i in range(total_batch): batch_x = self.train['x'][i * self.batch_size: (i+1) * self.batch_size] batch_y = self.train['y'][i * self.batch_size: (i+1) * self.batch_size] sess.run((self.optimizer, self.error), feed_dict={ self.x: batch_x, self.y: batch_y, self.real_batch_size: len(batch_x) }) epoch_list.append(epoch) t_error_value = sess.run(self.error, feed_dict={ self.x: self.train['x'], self.y: self.train['y'], self.real_batch_size: len(self.train['x']) }) train_error_value_list.append(t_error_value) v_error_value = sess.run(self.error, feed_dict={ self.x: self.validation['x'], self.y: self.validation['y'], self.real_batch_size: len(self.validation['x']) }) validation_error_value_list.append(v_error_value) error_value = sess.run(self.error, feed_dict={ self.x: self.test['x'], self.y: self.test['y'], self.real_batch_size: len(self.test['x']) }) test_error_value_list.append(error_value) if (epoch % 100 == 0): print("epoch: {0}, test_error_value: {1}".format(epoch, error_value)) print("train complete!") drawErrorValues(epoch_list, train_error_value_list, validation_error_value_list, test_error_value_list) def get_final_test_pred(self, data_x): init = tf.global_variables_initializer() with tf.Session() as sess: sess.run(init) pred = sess.run(self.pred, feed_dict={self.x: data_x}) return pred # In[ ]: def split_data(data, val_size=0.1, test_size=0.2): """ splits data to training, validation and testing parts """ ntest = int(round(len(data) * (1 - test_size))) nval = int(round(len(data) * (1 - (val_size + test_size)))) train_data, val_data, test_data = data[:nval], data[nval:ntest], data[ntest:] return train_data, val_data, test_data def prepare_data(data, n_steps): x = [] y = [] for i in range(len(data) - n_steps - 1): x_item = [] for j in range(n_steps): x_item.append([data[i + j]]) x.append(x_item) y.append([data[i + n_steps]]) x = np.asarray(x) y = np.asarray(y) return x, y def generate_data(func, x, n_steps): """ generates data with based on a function fct creates new data frame based on previous observation * example: l = [1, 2, 3, 4, 5] n_steps = 3 -> [[[1], [2], [3]], [[2], [3], [4]], [[3], [4], [5]]] """ data = func(x) train_data, val_data, test_data = split_data(data, val_size=0.1, test_size=0.2) train = {} val = {} test = {} train['x'], train['y'] = prepare_data(train_data, n_steps) val['x'], val['y'] = prepare_data(val_data, n_steps) test['x'], test['y'] = prepare_data(test_data, n_steps) return train, val, test # In[ ]: batch_size = 128 n_steps = 10 n_inputs = 1 n_state_units = 4 n_classes = 1 learning_rate = 0.01 max_training_epochs = 10000 if __name__ == '__main__': rnn = RNN(batch_size, n_steps, n_inputs, n_state_units, n_classes) train, val, test = generate_data(np.sin, np.linspace(0, 100, 100000), n_steps) rnn.set_data(train, val, test) rnn.make_rnn(learning_rate) rnn.learning(max_training_epochs) # In[ ]: if __name__ == '__main__': pred = rnn.get_final_test_pred(train['x']) fig = plt.figure(figsize=(20, 5)) plt.subplot(111) plt.plot(range(0, len(train['x'])), train['y'], 'b', label='True Sin Curv') plt.plot(range(0, len(train['x'])), pred, 'r', label='Predicted Sin Curv') plt.ylabel('Sine') plt.xlabel('-') plt.grid(True) plt.legend(loc='upper right') # In[ ]: if __name__ == '__main__': pred = rnn.get_final_test_pred(val['x']) fig = plt.figure(figsize=(20, 5)) plt.subplot(111) plt.plot(range(0, len(val['x'])), val['y'], 'b', label='True Sin Curv') plt.plot(range(0, len(val['x'])), pred, 'r', label='Predicted Sin Curv') plt.ylabel('Sine') plt.xlabel('-') plt.grid(True) plt.legend(loc='upper right') # In[ ]: if __name__ == '__main__': pred = rnn.get_final_test_pred(test['x']) fig = plt.figure(figsize=(20, 5)) plt.subplot(111) plt.plot(range(0, len(test['x'])), test['y'], 'b', label='True Sin Curv') plt.plot(range(0, len(test['x'])), pred, 'r', label='Predicted Sin Curv') plt.ylabel('Sine') plt.xlabel('-') plt.grid(True) plt.legend(loc='upper right') # In[ ]: