In [1]:
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np
import math
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline
print(tf.__version__)
1.0.1
In [2]:
def drawErrorValues(epoch_list, train_error_value_list, validation_error_value_list, test_error_value_list):
    fig = plt.figure(figsize=(20, 5))
    plt.subplot(121)
    plt.plot(epoch_list, train_error_value_list, 'r', label='Train')
    plt.plot(epoch_list, validation_error_value_list, 'g', label='Validation')
    plt.ylabel('Total Error')
    plt.xlabel('Epochs')
    plt.grid(True)
    plt.legend(loc='upper right')

    plt.subplot(122)
    plt.plot(epoch_list, test_error_value_list, 'b', label='Test')
    plt.ylabel('Accuracy')
    plt.xlabel('Epochs')
    plt.yticks(np.arange(min(test_error_value_list), max(test_error_value_list), 100))
    plt.grid(True)
    plt.legend(loc='lower right')            
    plt.show()
In [3]:
class RNN:
    def __init__(self, batch_size, n_steps, n_inputs, n_state_units, n_classes):
        self.batch_size = batch_size
        self.n_steps = n_steps
        self.n_inputs = n_inputs
        self.n_state_units = n_state_units
        self.n_classes = n_classes
        
    def set_data(self, train_data, val_data, test_data):
        self.train      = train_data
        self.validation = val_data
        self.test       = test_data
        
    def make_rnn(self, learning_rate):
        self.x = tf.placeholder(tf.float32, [None, self.n_steps, self.n_inputs])
        xt = tf.transpose(self.x, perm = [1, 0, 2])
        xr = tf.reshape(xt, shape = [-1, self.n_inputs])
        xs = tf.split(value=xr, num_or_size_splits=self.n_steps, axis=0)
        self.y = tf.placeholder(tf.float32, [None, n_classes])
        
        self.real_batch_size = tf.placeholder(tf.int32)
        
        rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units = n_state_units)
        initial_state = rnn_cell.zero_state(batch_size=self.real_batch_size, dtype=tf.float32)
        self.outputs, self.final_state = tf.contrib.rnn.static_rnn(rnn_cell, xs, initial_state=initial_state, dtype=tf.float32)
        
        self.W2 = tf.Variable(tf.random_normal([self.n_state_units, self.n_classes]))
        self.B = tf.Variable(tf.random_normal([self.n_classes]))

        self.pred = tf.matmul(self.outputs[-1], self.W2) + self.B
        self.error = tf.reduce_sum(tf.square(self.pred - self.y), axis=0)
        self.optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate).minimize(self.error)
   
    def learning(self, max_training_epochs):
        epoch_list                  = []
        train_error_value_list      = []
        validation_error_value_list = []
        test_error_value_list       = []
        test_accuracy_list          = []

        init = tf.global_variables_initializer()

        with tf.Session() as sess:
            sess.run(init)

            print("batch_size", self.batch_size)
            total_batch = int(math.ceil(len(self.train['x']) / float(self.batch_size)))
            print("Total batch: {0}".format(total_batch))

            for epoch in range(max_training_epochs):
                for i in range(total_batch):
                    batch_x = self.train['x'][i * self.batch_size: (i+1) * self.batch_size]
                    batch_y = self.train['y'][i * self.batch_size: (i+1) * self.batch_size]
                    sess.run((self.optimizer, self.error), feed_dict={
                        self.x: batch_x, 
                        self.y: batch_y,
                        self.real_batch_size: len(batch_x)
                    })

                epoch_list.append(epoch)
                t_error_value = sess.run(self.error, feed_dict={
                    self.x: self.train['x'], 
                    self.y: self.train['y'],
                    self.real_batch_size: len(self.train['x'])
                })
                train_error_value_list.append(t_error_value)

                v_error_value = sess.run(self.error, feed_dict={
                    self.x: self.validation['x'], 
                    self.y: self.validation['y'],
                    self.real_batch_size: len(self.validation['x'])
                })
                validation_error_value_list.append(v_error_value)

                error_value = sess.run(self.error, feed_dict={
                    self.x: self.test['x'], 
                    self.y: self.test['y'],
                    self.real_batch_size: len(self.test['x'])                    
                })
                test_error_value_list.append(error_value)
                if (epoch % 100 == 0):
                   print("epoch: {0}, test_error_value: {1}".format(epoch, error_value))

            print("train complete!")
            drawErrorValues(epoch_list, train_error_value_list, validation_error_value_list, test_error_value_list)
            
    def get_final_test_pred(self, data_x):
        init = tf.global_variables_initializer()
        with tf.Session() as sess:
            sess.run(init)
            pred = sess.run(self.pred, feed_dict={self.x: data_x})
            return pred
In [ ]:
def split_data(data, val_size=0.1, test_size=0.2):
    """
    splits data to training, validation and testing parts
    """
    ntest = int(round(len(data) * (1 - test_size)))
    nval = int(round(len(data) * (1 - (val_size + test_size))))

    train_data, val_data, test_data = data[:nval], data[nval:ntest], data[ntest:]
    return train_data, val_data, test_data

def prepare_data(data, n_steps):
    x = []
    y = []
    for i in range(len(data) - n_steps - 1):
        x_item = []
        for j in range(n_steps):
            x_item.append([data[i + j]])
        x.append(x_item)
        y.append([data[i + n_steps]])
    x = np.asarray(x)
    y = np.asarray(y)
    return x, y    

def generate_data(func, x, n_steps):
    """
    generates data with based on a function fct
    creates new data frame based on previous observation
       * example:
        l = [1, 2, 3, 4, 5]
        n_steps = 3
        -> [[[1], [2], [3]], [[2], [3], [4]], [[3], [4], [5]]]
    """
    data = func(x)
    train_data, val_data, test_data = split_data(data, val_size=0.1, test_size=0.2)
    train = {}
    val = {}
    test = {}
    train['x'], train['y'] = prepare_data(train_data, n_steps)
    val['x'], val['y'] = prepare_data(val_data, n_steps)
    test['x'], test['y'] = prepare_data(test_data, n_steps)
    return train, val, test
In [ ]:
batch_size = 128
n_steps = 10
n_inputs = 1
n_state_units = 4
n_classes = 1
learning_rate = 0.01
max_training_epochs = 10000

if __name__ == '__main__':
    rnn = RNN(batch_size, n_steps, n_inputs, n_state_units, n_classes)
    train, val, test = generate_data(np.sin, np.linspace(0, 100, 100000), n_steps)
    rnn.set_data(train, val, test)
    rnn.make_rnn(learning_rate)
    rnn.learning(max_training_epochs)
In [ ]:
if __name__ == '__main__':
    pred = rnn.get_final_test_pred(train['x'])
    fig = plt.figure(figsize=(20, 5))
    plt.subplot(111)
    plt.plot(range(0, len(train['x'])), train['y'], 'b', label='True Sin Curv')
    plt.plot(range(0, len(train['x'])), pred, 'r', label='Predicted Sin Curv')
    plt.ylabel('Sine')
    plt.xlabel('-')
    plt.grid(True)
    plt.legend(loc='upper right')
In [ ]:
if __name__ == '__main__':
    pred = rnn.get_final_test_pred(val['x'])
    fig = plt.figure(figsize=(20, 5))
    plt.subplot(111)
    plt.plot(range(0, len(val['x'])), val['y'], 'b', label='True Sin Curv')
    plt.plot(range(0, len(val['x'])), pred, 'r', label='Predicted Sin Curv')
    plt.ylabel('Sine')
    plt.xlabel('-')
    plt.grid(True)
    plt.legend(loc='upper right')
In [ ]:
if __name__ == '__main__':
    pred = rnn.get_final_test_pred(test['x'])
    fig = plt.figure(figsize=(20, 5))
    plt.subplot(111)
    plt.plot(range(0, len(test['x'])), test['y'], 'b', label='True Sin Curv')
    plt.plot(range(0, len(test['x'])), pred, 'r', label='Predicted Sin Curv')
    plt.ylabel('Sine')
    plt.xlabel('-')
    plt.grid(True)
    plt.legend(loc='upper right')
In [ ]: