import functools
import itertools
import numpy as np
import sklearn.datasets
import sklearn.preprocessing
import matplotlib.pyplot as plt
%matplotlib inline
The core idea behind deep learning is that "deep" functions should be built using simpler, parameterised, differentiable functions (aka layers). So, let's define some simple layers we can work with.
Each layer needs a forward method -- __call__
and a method to calculate the gradients of the outputs w.r.t the inputs (evaluated at a given input) -- grad
. They also need to save their inputs/activations so we can calculate their gradients at a given input later.
So how can we actually calculate the derivatives of our layers? We can use symbolic differentiation to do the work for us. Done naively this can be quite expensive.
Automatic differentiation is an efficient way to calculate the derivatives of complex functions coposed of simpler ones. More on this later.
When we take the derivative of a function with respect to its inputs we are asking; how (much) do changes in each input effect each of the outputs. Thus grad
should return an array of shape [n_inputs, batch_size, n_outputs, batch_size]
.
However, we can flatten across the 2nd batch dimension as we expect batches to be computed independently of each other, thus the [batch_size , batch_size]
dimensions will be diagonal and flattening across one of them preserves all the info.
class Sigmoid():
def __call__(self, x):
self.x = x
return 1/(1+np.exp(-x))
def grad(self, x=None):
if x is None:
x = self.x
y = self.__call__(x)
g = np.zeros((x.shape[1], x.shape[0], y.shape[1]))
# sigmoids act element wise, thus have a diagonal
# jacobian/grad
i, j = np.diag_indices(x.shape[1])
g[i, :, j] = (y*(1-y)).T
return g
class Linear():
def __init__(self, shape):
# this layer actually has some state.
# its parameters (which we will train later)
self.weights = (2/(shape[0]+shape[1]))*np.random.standard_normal(shape)
self.biases = np.random.standard_normal((1,shape[-1]))
def __call__(self, x):
self.x = x
return np.dot(x, self.weights) + self.biases
def grad(self, x=None):
if x is None:
x = self.x
return np.tile(np.expand_dims(self.weights, axis=1), (1, x.shape[0], 1))
Cool, now that we have some layers, but we would really like to check that we have got them right. We can use an empirical estimate of the gradients and compare them to our grad
s.
Since a gradient asks how each input effects each output, we can literally just change each input and observe the change in each output. This is known as a finite difference approximation, and as the change in inputs approaches zero, the accuracy of this estimate approaches the true derivative.
limh→0∂f(x)∂x=f(x+h)−f(x)hdef finite_difference(func, inputs, direction, epsilon=1e-8):
"""
Calculate a finite difference on the first argument of
`func` according to the direction vector.
"""
return (func(inputs[0]+epsilon*direction, *inputs[1:]) - func(*inputs))/epsilon
def empirical_gradient(func, inputs, targets=None):
"""
Do our gradients match finite difference approximations?
Args:
func (callable): The target function. `func: inputs, targets -> outputs`
inputs (np.array): an array of inputs [batch_size, ...]
targets optional(np.array): an array of targets [batch_size, ...]
Returns:
the gradients of `func` w.r.t inputs summed over the batch.
"""
grads = [None]*inputs.shape[0]
# for each element of the batch
for i, x in enumerate(inputs):
grad = [None]*inputs.shape[-1]
x = x.reshape((1, inputs.shape[-1]))
# for each input variable
for j in range(x.shape[-1]):
# create a direction to perturb along
direction = np.zeros(x.shape)
direction[0,j] = 1
# finite difference to approximate
if targets is None:
grad[j] = finite_difference(func, [x], direction)
else:
grad[j] = finite_difference(func, [x, targets[i, :]], direction)
grads[i] = grad
grads = np.array(grads)
grads = np.squeeze(grads)
grads = np.transpose(grads, [1, 0, 2])
return grads
def sanity_check(fn, *args):
"""
Check that the layers works as expected.
Estimate their symbolic and empirical gradients
and print the difference.
"""
print('### {}'.format(fn.__class__.__name__))
print('Output shape {}'.format(fn(*args).shape))
print('Grad shape {}'.format(fn.grad(*args).shape))
empirical = empirical_gradient(fn, *args)
symbolic = fn.grad(*args)
difference = empirical - symbolic
print('Empirical: {:.3f} {} Symbolic : {:.3f} {}'.format(
np.mean(empirical), empirical.shape, np.mean(symbolic), symbolic.shape))
print('Difference: {}'.format(np.mean(np.abs(difference))))
print('\n')
batch_size = 50
n_inputs = 64
n_classes = 10
# some variables for testing with
x = np.random.random((batch_size, n_inputs))
y = np.random.random((batch_size, n_classes))
T = np.random.randint(0, 2, (batch_size, n_classes)).astype(np.float32)
sanity_check(Linear((64, 10)), x)
sanity_check(Sigmoid(), x)
### Linear Output shape (50, 10) Grad shape (64, 50, 10) Empirical: -0.001 (64, 50, 10) Symbolic : -0.001 (64, 50, 10) Difference: 3.2874463269992963e-09 ### Sigmoid Output shape (50, 64) Grad shape (64, 50, 64) Empirical: 0.004 (64, 50, 64) Symbolic : 0.004 (64, 50, 64) Difference: 7.781569369080744e-11
There are many types of 'simple' layer that can be used, it just needs to be differentiable. You have been started with a few above.
Implement some activation functions;
(what makes a good activation function?).
Implement softmax for classification (layer).
class ReLU():
def __call__(self, x):
self.x = x
return np.maximum(0, x)
def grad(self, x=None):
if x is None:
x = self.x
g = np.zeros((x.shape[1], x.shape[0], x.shape[1]))
i, j = np.diag_indices(x.shape[1])
g[i, :, j] = ((x>0).astype(np.float32)).T
return g
class Softmax():
def __call__(self, x):
self.x = x
return np.exp(x)/np.sum(np.exp(x), axis=1, keepdims=True)
def grad(self, x=None):
if x is None:
x = self.x
y = self.__call__(x)
g = np.zeros((x.shape[1], x.shape[0], y.shape[1]))
i, j = np.diag_indices(x.shape[1])
g[:, :, :] = -np.expand_dims(y, 0)*np.expand_dims(y.T, -1)
g[i, :, j] = y.T*(1-y.T)
return g
sanity_check(Softmax(), x)
sanity_check(ReLU(), x)
### Softmax Output shape (50, 64) Grad shape (64, 50, 64) Empirical: -0.000 (64, 50, 64) Symbolic : -0.000 (64, 50, 64) Difference: 1.1914430442511248e-10 ### ReLU Output shape (50, 64) Grad shape (64, 50, 64) Empirical: 0.016 (64, 50, 64) Symbolic : 0.016 (64, 50, 64) Difference: 4.249758826313857e-11
Ok, now that we have some working layers we can compose them together to build 'deeper' functions. What is the point of this? We want to construct a set of functions that are able to approximate relatively arbitrary target functions of interest. By controlling the type of layers, how many of them, etc... we can control how well our network can approximate certain (more or less complex) functions.
Assuming these layers are linear transformations followed by element-wise non-linearities, this stack of layers is typically refered to as a neural network.
There are many different ways to compose these layers together (connection topologies), however, here we are only going to consider the simplest case, feed-forward architectures.
"""
Now we want to define a neural network that takes inputs to
f: x -> y.
"""
layers = [
Linear((n_inputs, 30)),
Sigmoid(),
Linear((30, 20)),
Sigmoid(),
Linear((20, n_classes)),
Softmax()
]
def compose(f1, f2):
# nested function composition
return lambda x: f2(f1(x))
# you may be interested in `reduce` and `accumulate`
# if you haven't come across them before
forward_prop = functools.reduce(compose, layers)
logits = forward_prop(x)
print('Input shape: {}, Output shape: {}'.format(x.shape, logits.shape))
Input shape: (50, 64), Output shape: (50, 10)
Cool, we can construct classes of functions, but how can we find the one that accurately approximates our target function? This is known as learning. A few problems arise if we want to train these deep functions.
All learning requires that we have some measure of how well we are doing, the loss, or risk, or cost. Using this metric we can get feedback by asking how well we have done on some data. In deep learning we use metrics that are differentiable so we can ask them: how we can we change our outputs to have done better.
We want to estimate ∂L∂y, effectively this asks: how should our outputs have been different to achieve a lower loss, to do a better job?
class CrossEntropy():
def __call__(self, y, T):
self.y = y
self.T = T
return -T*np.log(y+1e-6)
def grad(self, y=None, T=None):
if y is None:
y = self.y
if T is None:
T = self.T
g = np.zeros((y.shape[1], y.shape[0], T.shape[1]))
i, j = np.diag_indices(y.shape[1])
g[i, :, j] = (-T/(y+1e-6)).T
return g
# Check that cross entropy works as expected
ce = CrossEntropy()
# hold T constant over the batch
t = np.random.randint(0, 2, (1, n_classes)).astype(np.float32)
T = np.concatenate([t for n in range(batch_size)], axis=0)
sanity_check(CrossEntropy(), y, T)
### CrossEntropy Output shape (50, 10) Grad shape (10, 50, 10) Empirical: -1.582 (10, 50, 10) Symbolic : -1.582 (10, 50, 10) Difference: 3.306297476259714e-05
In addition of a metric of loss, we may also have other beliefs about what we want.
Implement Weight decay.
class WeightDecay():
def __call__(self, parameters):
self.parameters = parameters
return 0.5*np.sum(np.square(self.parameters))
def grad(self, parameters=None):
if parameters is None:
self.parameters = parameters
return np.expand_dims(parameters, -1)
# Check that weight decay works as expected
wd = WeightDecay()
Awesome, we have some feedback with respect to the output of our approximation, but how did each layer contribute to that feedback? We want to get feedback for each layer (which we use to update our parameters).
We want to propagate feedback to functions nested in our heirarchy. A simple nested function is ddxf(g(x)). But how can we differentiate this with respect to x? We can take a linear approximation of f,g, at x and use that estimate ddxf(g(x)).
So if we let u=g(x) and y=f(u) then we can calculate dydx=dydu⋅dudx. Aka the chain rule.
def chain_rule(x, y, reverse=False):
"""
Nested function composition gives multiplication
of the nested function's gradients.
"""
x = evaluate(x)
y = evaluate(y)
if reverse:
y = y.T
# TODO not sure about this. the sum over axis 1 feels weird
return np.sum(np.tensordot(x, y, axes=(2, 0)), axis=1)
def evaluate(func):
"""
This is just a helper for partial evaluation
so we dont need to store all the gradients in memory.
we can just calculate them as needed.
This relates to why reverse-mode AD is more
efficient (in memory) than forward-mode.
"""
if not isinstance(func, np.ndarray):
return func()
else:
return func
So we know how to propagate feedback/gradients through two functions, use the chain rule. So all that is left is to recursively apply the chain rule and propagate gradients backwards (aka reverse mode) from the gradient of the loss w.r.t outputs through our layers.
[f1,…fL,L][∂z1∂x,…∂y∂zL,∂L∂y][(ninputs×nb×n1),…(nL×nb×noutputs),(noutputs×nb×1)]### Reverse mode automatic differentiation
# need to call forward propagation first to get the activations
# (but we already did it above)
# now go back through the network and propagate the gradients
grads = [layer.grad for layer in reversed(layers)] # get handles to the grad fns
dLdy = np.diag(np.ones(10, dtype=np.float32)) # make up a pseudo feedback vector
dLdy = np.stack([dLdy for _ in range(50)], axis=1) # stack into a batch
chain = functools.partial(chain_rule, reverse=True)
deltas = list(itertools.accumulate([dLdy] + grads, chain))
print([d.T.shape for d in reversed(deltas)])
[(64, 50, 10), (30, 50, 10), (30, 50, 10), (20, 50, 10), (20, 50, 10), (10, 50, 10), (10, 50, 10)]
class Feedforward():
def __init__(self, layers):
self.layers = layers
self.fn = functools.reduce(compose, layers)
self.chain = functools.partial(chain_rule, reverse=True)
def __call__(self, x):
self.x = x
return self.fn(x)
def grad(self, x=None):
if x is None:
x = self.x
# ahh, need this as __call__ might
# not be called directly before grad
self.fn(x)
grads = [layer.grad for layer in reversed(self.layers)]
self.grads = [evaluate(g).shape for g in grads]
return np.array(functools.reduce(self.chain, grads)).T
ff = Feedforward(layers)
sanity_check(ff, x) # something isnt right here...
### Feedforward Output shape (50, 10) Grad shape (64, 50, 10) Empirical: -0.000 (64, 50, 10) Symbolic : -0.000 (64, 50, 10) Difference: 1401.932542531316
Alternatively, we could propagate the gradients forward, aka forward-mode automatic differentiation.
### Forward mode automatic differentiation
# need to call a layers activation before we can evaluate its gradient
def forward_mode(var, layer):
x, delta = var
# calculate the output at x
activation = layer(x)
# calculate the grad at x
gradient = layer.grad(x)
# note that in forward mode we dont want the
# calculation of the gradients to be lazy as
# if it was we would have to wait longer.
return activation, gradient
# propagate inputs forward through the network
# and calculate activations and gradients as you go.
results = itertools.accumulate([(x, None), layers[0]] + layers[1:], forward_mode)
acts, grads = tuple(zip(*results)) # reorganise list of tuples to tuple of lists
# each grad is dz_l/dx
# note the shapes!!!
print('grads')
print([g.shape for g in grads[1:]])
# note the memory footprint here. we needed to construct every gradient matrix
# as we went before we could reduce them back down to [n x m x 1]
# now we can apply chain rule to find dL/dz
print('deltas')
deltas = list(itertools.accumulate([dLdy] + list(reversed(grads[1:])), chain))
print([d.T.shape for d in reversed(list(deltas))])
## TODO want to profile forward vs backward
## TODO this could be done with partial eval!?
grads [(64, 50, 30), (30, 50, 30), (30, 50, 20), (20, 50, 20), (20, 50, 10), (10, 50, 10)] deltas [(64, 50, 10), (30, 50, 10), (30, 50, 10), (20, 50, 10), (20, 50, 10), (10, 50, 10), (10, 50, 10)]
You may have noted that when we did backprop we required a list containing each of the layers. This allowed us to reverse the computational steps and propagate the gradients in the correct order. This concept (gradient propagation through computations) can be far more general by representing the networks as computation graphs, allowing you to propagate gradients here there and everywhere.
Also, for
loops, if
statements, ... can yield differentiable results (wrt to previous operations, not these operations).
Finally, we need to propagate the error from the outputs of a layer into the parameters of a layer. This can be done using an updage
method. For the linear layer, we need to propagate the gradients to the weights and biases, this can be done using the same method as before, chain rule.
class LinearLayer(Linear): # want to write this as a wrapper/decorator!?!
def __init__(self, shape):
super(self.__class__, self).__init__(shape)
def init(self, opt):
self.w_opt = opt(self.weights)
self.b_opt = opt(self.biases)
def update(self, delta):
# need to be able to take a delta from the layers outputs and
# propagate the gradients to the desired parameters
delta_w = np.dot(np.mean(self.x, axis=0, keepdims=True).T, delta)
delta_b = delta
self.weights += self.w_opt(delta_w)
self.biases += self.b_opt(delta_b)
Because we have been provided a gradient of the loss w.r.t our parameters, via feedback and backprop, we can use gradient descent to follow that gradient to a (local) minima.
xt+1=xt−η∇f(x)We define our loss as the mean of the loss over a batch, thus to make an update we take the average of the gradients over that batch. (Alternatively you can view this as make a more accurate estimate of the gradient by taking many samples at the current parameter setting).
class GradientDescent():
def __init__(self, param, lr, momentum=0.0):
self.lr = lr
self.momentum = momentum
# for momentum
if momentum:
self.g = np.zeros_like(param)
def __call__(self, delta):
if self.momentum:
self.g = self.momentum*self.g + delta
return - self.lr*self.g
else:
return - self.lr*delta
# TODO how can we verify our optimisers?
# empirical checks? plots of attention over past grads?
Like the structure of our network, regularisers are also used to enforce priors. However, regularisers allow the learning algorithms to find a balance between regularisation terms and loss. Regularisers are soft priors as opposed to the ones hardwired into the structure of our network.
Implement some regularisers;
(What are some reasonable prior beliefs about our parameters w.r.t the data?)
There is a vast array of optimisers. Here we only consider first-order gradient based optimisers.
Implement some momentum optimisers;
(How should we update our parameters given a noisy estimate of the gradient? How should past estimates influence our current decision?)
class Adam():
def __init__(self, param, lr, d1=0.9, d2=0.999):
self.lr = lr
self.d1 = d1
self.d2 = d2
# for momentum
self.m = np.zeros_like(param)
self.v = np.zeros_like(param)
def __call__(self, delta):
self.m = self.d1*self.m + delta
self.v = self.d2*self.v + delta**2
return - self.lr*self.m/np.sqrt(self.v + 1e-8)
# TODO. these are non trivial in my framework.
# The lack of a graph to help propagate gradients make these
# hard to work with.
class Certainty():
pass
class Sparsity():
pass
class Orthogonal():
pass
# visualisations of these!! vector flow plots?!
The most important part is the data. It is the data that contains all the patterns we are interested in. The approximator is just a way to extract them.
digits = sklearn.datasets.load_digits(n_class=10)
onehot = sklearn.preprocessing.OneHotEncoder(sparse=False)
plt.figure(figsize=(16,4))
for i in range(10):
plt.subplot(1,10,i+1)
plt.imshow(digits.images[i], cmap='gray', interpolation='nearest')
plt.axis('off')
images = digits.images.reshape((-1, 8*8)).astype(np.float32)/np.max(digits.images)
print(images.shape)
labels = digits.target
labels = onehot.fit_transform(labels.reshape((-1, 1))).astype(np.float32)
print(labels.shape)
(1797, 64) (1797, 10)
def make_batches(x, y, batch_size):
n_of_data = x.shape[0]
assert n_of_data == y.shape[0]
for i in range(n_of_data//batch_size):
yield (x[batch_size*i:batch_size*(i+1), ...],
y[batch_size*i:batch_size*(i+1), ...])
def train(inputs, targets, layers, epochs, batch_size, optimiser):
"""
Arg:
inputs (np.array): the input vectors in [N_examples, N_features]
targets (np.array): the target vectors in [N_examples, N_classes]
layers (list): a list of layer objects (must be callable and have
a gradient method defined)
epochs (int): the number of times to pass through the N_examples
batch_size (int): the number of examples to use to estimate the gradient
for each parameter update
optimiser (): such as GradientDescent
Returns:
layers (list): a list of trained layers.
"""
# init an optimiser for each layer
for layer in layers:
if hasattr(layer, "init"):
layer.init(optimiser)
forward_prop = functools.reduce(compose, layers)
loss_fn = CrossEntropy()
for e in range(epochs):
# shuffle the data and make batches
idx = np.random.permutation(range(len(inputs)))
images, labels = inputs[idx, ...], targets[idx, ...]
for x, T in make_batches(images, labels, batch_size):
### Estimate the gradient (dloss/doutput)
preds = forward_prop(x)
dLdy = np.sum(loss_fn.grad(preds, T), axis=0, keepdims=True)
### Assign credit via backprop
grads = [layer.grad for layer in reversed(layers)]
deltas = itertools.accumulate([dLdy] + grads, chain)
### Update the parameters
for layer, delta in zip(reversed(layers), deltas):
if hasattr(layer, "update"):
layer.update(np.mean(delta, axis=1))
loss = loss_fn(preds, T)
print('\repoch: {} loss: {:.4f}'.format(e,
np.mean(np.sum(loss, axis=1))),
end='', flush=True)
return layers
How do we know we have learn anything useful? Prove to us that this network has 'learned'.
(What were we training it to do? How good is it at doing that?)
# Pick a subset of the images/labels for training
# and use the left overs for testing our accuracy.
# It is easy to fool ourselves by memorising/overfitting to the training data
# we need to check that the model generalises to unseen data.
# (this is the difference between optimisation and learning.
# we expect learners to generalise)
idx = np.random.permutation(range(len(images)))
split = int(0.8 * len(images))
train_images, train_labels = images[:split, ...], labels[:split, ...]
test_images, test_labels = images[split:, ...], labels[split:, ...]
n_repeats = 3
def accuracy(images, labels, layers):
forward_prop = functools.reduce(compose, layers)
predictions = forward_prop(images)
return np.mean(np.equal(np.argmax(predictions, axis=1),
np.argmax(labels, axis=1)))
for i in range(n_repeats):
layers = [
LinearLayer((n_inputs, n_classes)),
Softmax()
]
layers = train(images,
labels,
layers,
epochs=50,
batch_size=12,
optimiser=lambda x: GradientDescent(x, 0.001, momentum=0.9))
valid_accuracy = accuracy(test_images, test_labels, layers)
print('\n accuracy: {:.3f}'.format(valid_accuracy))
epoch: 49 loss: 0.5636 accuracy: 0.869 epoch: 49 loss: 0.4582 accuracy: 0.872 epoch: 49 loss: 0.7004 accuracy: 0.881
# lets have a peek at the linear fns layers
plt.figure(figsize=(16,4))
for i in range(10):
plt.subplot(1,10,i+1)
plt.imshow(layers[0].weights[:, i].reshape((8,8)), cmap='gray', interpolation='nearest')
plt.axis('off')
Cool, every thing seems to work. Now we just* need to tinker with some hyper parameters to get everything running smoothly.
for i in range(n_repeats):
layers = [
LinearLayer((n_inputs, 30)),
Sigmoid(),
LinearLayer((30, n_classes)),
Softmax()
]
layers = train(images,
labels,
layers,
epochs=50,
batch_size=4,
optimiser=lambda x: Adam(x, 0.0005))
valid_accuracy = accuracy(test_images, test_labels, layers)
print('\n accuracy: {:.3f}'.format(valid_accuracy))
epoch: 49 loss: 1.3402 accuracy: 0.800 epoch: 49 loss: 1.4363 accuracy: 0.683 epoch: 49 loss: 1.3971 accuracy: 0.789
for i in range(n_repeats):
layers = [
LinearLayer((n_inputs, 30)),
ReLU(),
LinearLayer((30, 30)),
ReLU(),
LinearLayer((30, n_classes)),
Softmax()
]
layers = train(images,
labels,
layers,
epochs=100,
batch_size=4,
optimiser=lambda x: Adam(x, 0.0001))
valid_accuracy = accuracy(test_images, test_labels, layers)
print('\n accuracy: {:.3f}'.format(valid_accuracy))
epoch: 99 loss: 1.9657 accuracy: 0.369 epoch: 99 loss: 1.8371 accuracy: 0.389 epoch: 99 loss: 2.1320 accuracy: 0.319
for i in range(n_repeats):
layers = [
Linear((n_inputs, 128)),
Sigmoid(),
Linear((128, n_classes)),
Softmax()
]
layers = train(images,
labels,
layers,
epochs=20,
batch_size=16,
optimiser=lambda x: GradientDescent(x, 0.0001, momentum=0.9))
valid_accuracy = accuracy(test_images, test_labels, layers)
print('\n accuracy: {:.3f}'.format(valid_accuracy))
epoch: 19 loss: 2.4913 accuracy: 0.100 epoch: 19 loss: 2.5088 accuracy: 0.092 epoch: 19 loss: 2.7148 accuracy: 0.103
for i in range(n_repeats):
width = 8
depth = 8
layers = [Linear((n_inputs, width))]
for _ in range(depth):
layers += [Sigmoid(), Linear((width, width))]
layers += [Linear((width, n_classes)), Softmax()]
layers = train(images,
labels,
layers,
epochs=200,
batch_size=8,
optimiser=lambda x: GradientDescent(x, 0.0001, momentum=0.9))
valid_accuracy = accuracy(test_images, test_labels, layers)
print('\n accuracy: {:.3f}'.format(valid_accuracy))
epoch: 199 loss: 2.9327 accuracy: 0.092 epoch: 199 loss: 3.9877 accuracy: 0.103 epoch: 199 loss: 3.3845 accuracy: 0.103