from __future__ import print_function import cPickle as pickle import gzip import itertools import urllib import numpy as np import nntools import theano import theano.tensor as T # Global constants # Filename of the MNIST pickle; get it from http://deeplearning.net/data/mnist/mnist.pkl.gz DATA_FILENAME = 'mnist.pkl.gz' # How many epochs must the validation loss be greater than the best so far before stopping? NUM_BAD_EPOCHS = 100 # Size of each minibatch BATCH_SIZE = 500 # Number of units in the single hidden layer NUM_HIDDEN_UNITS = 100 # Learning rate (eta) LEARNING_RATE = 0.005 # Weight decay lambda parameter DECAY_LAMBDA = 5. def one_hot(labels, n_classes): ''' Converts an array of label integers to a one-hot matrix encoding :parameters: - labels : np.ndarray, dtype=int Array of integer labels, in {0, n_classes - 1} - n_classes : int Total number of classes :returns: - one_hot : np.ndarray, dtype=bool, shape=(labels.shape[0], n_classes) One-hot matrix of the input ''' one_hot = np.zeros((labels.shape[0], n_classes)).astype(int) one_hot[range(labels.shape[0]), labels] = True return one_hot def load_data(): ''' Load in the mnist.pkl data :returns: - dataset : dict A dict containing train/validation/test data/labels/shapes ''' # Load in the pkl.gz with gzip.open(DATA_FILENAME, 'rb') as f: data = pickle.load(f) X_train, y_train = data[0] X_valid, y_valid = data[1] X_test, y_test = data[2] # Get the number of classes in the data (should be 10) num_classes = np.unique(y_train).shape[0] # Convert class numbers (ints) to one-hot representation (see above) y_train = one_hot(y_train, num_classes) y_valid = one_hot(y_valid, num_classes) y_test = one_hot(y_test, num_classes) # Construct a dataset dict return dict(X_train=theano.shared(nntools.utils.floatX(X_train)), y_train=theano.shared(nntools.utils.floatX(y_train)), X_valid=theano.shared(nntools.utils.floatX(X_valid)), y_valid=theano.shared(nntools.utils.floatX(y_valid)), X_test=theano.shared(nntools.utils.floatX(X_test)), y_test=theano.shared(nntools.utils.floatX(y_test)), num_examples_train=X_train.shape[0], num_examples_valid=X_valid.shape[0], num_examples_test=X_test.shape[0], input_dim=X_train.shape[1], output_dim=num_classes) def create_iter_functions(dataset, output_layer, batch_size=BATCH_SIZE, learning_rate=LEARNING_RATE, decay_lambda=DECAY_LAMBDA): ''' Create functions for training the network and computing train/validation/test loss/accuracy :parameters: - dataset : dict Dataset dict, as returned by load_data - output_layer : nntools.Layer Output layer of a neural network you've constructed - batch_size : int Mini-batch size - learning_rate : float Learning rate for SGD optimization - decay_lambda : float Weight decay lambda hyperparameter :returns: - iter_funcs : dict Dictionary of iterator functions for training/evaluating the network ''' # Mini-batch index, symbolic, for use in theano functions batch_index = T.iscalar('batch_index') # X (data) and y (output) symbolic matrices X_batch = T.matrix('x') y_batch = T.matrix('y') # Create a slice object for indexing X and y to obtain batches batch_slice = slice(batch_index * batch_size, (batch_index + 1) * batch_size) # Loss function for the network def loss(output): # Collect all non-bias parameters params = nntools.layers.get_all_non_bias_params(output_layer) # Loss = cross-entropy ... return (T.sum(-y_batch*T.log(output) - (1. - y_batch)*T.log(1. - output)) # + weight decay + (decay_lambda/y_batch.shape[0])*sum(T.sum(p**2) for p in params)) # Symbolic loss function for a batch of data loss_train = loss(output_layer.get_output(X_batch)) # When using a dropout layer, we need to not drop out units when computing # validation/test statistics. We'll use this function instead loss_eval = loss(output_layer.get_output(X_batch, deterministic=True)) # Compute predicted class for a batch pred = T.argmax(output_layer.get_output(X_batch, deterministic=True), axis=1) # Compute the accuracy - mean number of correct classes accuracy = T.mean(T.eq(pred, T.argmax(y_batch, axis=1))) # Collect all parameters of the network all_params = nntools.layers.get_all_params(output_layer) # Compute SGD updates for these parameters updates = nntools.updates.sgd(loss_train, all_params, learning_rate) # Create training function - includes updates iter_train = theano.function([batch_index], loss_train, updates=updates, givens={X_batch: dataset['X_train'][batch_slice], y_batch: dataset['y_train'][batch_slice]}) # Create validation/test functions iter_valid = theano.function([batch_index], [loss_eval, accuracy], givens={X_batch: dataset['X_valid'][batch_slice], y_batch: dataset['y_valid'][batch_slice]}) iter_test = theano.function([batch_index], [loss_eval, accuracy], givens={X_batch: dataset['X_test'][batch_slice], y_batch: dataset['y_test'][batch_slice]}) return dict(train=iter_train, valid=iter_valid, test=iter_test) def train(iter_funcs, dataset, batch_size=BATCH_SIZE): ''' Create an iterator for training using iterator functions. :parameters: - iter_funcs : dict Dictionary of iterator functions, as returned by create_iter_functions - dataset : dict Dataset dictionary, as returned by load_data - batch_size : int Mini-batch size :returns: - epoch_result : dict Statistics for each epoch, yielded after each epoch ''' # Compute the number of train/validation minibatches num_batches_train = dataset['num_examples_train'] // batch_size num_batches_valid = dataset['num_examples_valid'] // batch_size # Count indefinitely starting from 1 for epoch in itertools.count(1): # Train for one epoch over all minibatches batch_train_losses = [] for b in range(num_batches_train): batch_train_loss = iter_funcs['train'](b) batch_train_losses.append(batch_train_loss) # Compute average training loss for all minibatches avg_train_loss = np.mean(batch_train_losses) # Compute validation loss/accuracy by accumulating over all batches... batch_valid_losses = [] batch_valid_accuracies = [] for b in range(num_batches_valid): batch_valid_loss, batch_valid_accuracy = iter_funcs['valid'](b) batch_valid_losses.append(batch_valid_loss) batch_valid_accuracies.append(batch_valid_accuracy) # ...and taking the mean avg_valid_loss = np.mean(batch_valid_losses) avg_valid_accuracy = np.mean(batch_valid_accuracies) # Yield the epoch result dict yield {'number': epoch, 'train_loss': avg_train_loss, 'valid_loss': avg_valid_loss, 'valid_accuracy': avg_valid_accuracy} def test_accuracy(iter_funcs, dataset, batch_size=BATCH_SIZE): ''' Compute accuracy on the test set. :parameters: - iter_funcs : dict Dictionary of iterator functions, as returned by create_iter_functions - dataset : dict Dataset dictionary, as returned by load_data - batch_size : int Mini-batch size :returns: - test_accuracy : float Model accuracy on the test set ''' # Compute the number of test batches num_batches_test = dataset['num_examples_test'] // batch_size # Accumulate test accuracy over all batches batch_accuracies = [] for b in range(num_batches_test): batch_loss, batch_accuracy = iter_funcs['valid'](b) batch_accuracies.append(batch_accuracy) # Take the mean over all batches to get the actual test accuracy return np.mean(batch_accuracies) import IPython.display import matplotlib.pyplot as plt %matplotlib inline # Load in the data dict dataset = load_data() # Construct the network, first with the input layer l_in = nntools.layers.InputLayer(shape=(BATCH_SIZE, dataset['input_dim'])) # One hidden layer l_hidden1 = nntools.layers.DenseLayer(l_in, num_units=NUM_HIDDEN_UNITS, # Sigmoidal activation, as in the chapter nonlinearity=nntools.nonlinearities.sigmoid, # Initialize with normal with std = 1/sqrt(fan-in) W=nntools.init.Normal(std=1./np.sqrt(dataset['input_dim']))) # Output layer l_out = nntools.layers.DenseLayer(l_hidden1, num_units=dataset['output_dim'], # Sigmoidal activation, as in the chapter nonlinearity=nntools.nonlinearities.sigmoid, # Initialize with normal with std = 1/sqrt(fan-in) W=nntools.init.Normal(std=1./np.sqrt(NUM_HIDDEN_UNITS))) # Construct iterator function dictionary iter_funcs = create_iter_functions(dataset, l_out) # Keep track of train/validation losses for later plotting train_losses = [] valid_losses = [] # Keep track of the best validation loss so far for early stopping best_valid_loss = np.inf # Try/except is so we can stop early manually try: # Calling train in a for loop will train one epoch at a time for epoch in train(iter_funcs, dataset): # Print statistics of this epoch IPython.display.clear_output(wait=True) print("Epoch {}".format(epoch['number'])) print(" training loss:\t\t{}".format(epoch['train_loss'])) print(" validation loss:\t\t{}".format(epoch['valid_loss'])) print(" validation accuracy:\t\t{:.3f}%".format(epoch['valid_accuracy'] * 100)) # Store the validation/train loss for this epoch train_losses.append(epoch['train_loss']) valid_losses.append(epoch['valid_loss']) # If this is a new best validation loss, store it if epoch['valid_loss'] < best_valid_loss: best_valid_loss = epoch['valid_loss'] # Otherwise, if there's not best validation loss in NUM_BAD_EPOCHS, break else: if (np.array(valid_losses)[-NUM_BAD_EPOCHS:] > best_valid_loss).all(): break except KeyboardInterrupt: pass # Plot train/validation curves plt.plot(train_losses, label='Train loss') plt.plot(valid_losses, label='Validation loss') plt.legend() print('Test accuracy: {:.3f}%'.format(test_accuracy(iter_funcs, dataset)*100))