This notebook is an exploraton of gradient descent algorithms. We explore two dimensions of variation here. The first pertains to the calculation of the magnitude of the descent (the learning rate, α) along the gradient at each iteration of the algorithm. The second pertains to variations in the number of training samples utilized in the calculation of the gradient ▽ of the cost J with respect to θj.
Gradient descent is essentially a method for calculating parameters that when applied to each of the observations (also called dependent variables or features) in a data set X results in the lowest total error, or cost.
f(X,θ)=Hwhere H is the estimated value of the output (also called dependent variables or labels) for each set of inputs and
J(θ)=m∑i=1∣hi(θ,xi)−yi∣where J represents the total cost. Different methods of computing the cost are used depending on the application, but the formula above expresses the essence of it, which is that we are trying to predict the actual outcomes as closely as possible.
In order to minimze J with respect to θ we start with an abitrary value of θ and then calculate the direction for each our input variables that would result in the fastest decrease in cost (gradient). Then we calculate a new theta based on the gradient and a model input for the magnitude of the change (the learning rate or α). Then we repeat that process until θ and J stop changing and that should result in a value of θ that minimizes J.
θj:=θj−α▽θjJ(θ)or
θj:=θj−α∂∂θjJ(θ)θ can also be calculated directly using the equation below.
θ=(XTX)−1XTyComplexity for the normal equation, O(n3), is greater than it is it for the basic gradient descent algorithm, O(n2), but has acceptable performance up to approximately n=105 features.
We explore three methods for determining the learning rate.
In order to compare the performance of the algorithms and ensure that they are functioning correctly, we will use a known function as our cost function. This allows us to compare the value of θ produced by each algorithm and its associated J with values we can calculate directly from our known function. Using a known function with two dimensional inputs allows us to plot as a surface all possible values of J for a given range of values for θ and then Jθ for each iteration of the algorithm. We will use the Stablinsky-Tang function, which results in a non-convex cost surface suitable for testing with straightforward gradient and cost computations.
Our gradient descent algorithm has been generalized to accept the following arguments:
Argument | Definition |
---|---|
theta_hist |
Starting value of θ0 and calculated value of Jθ0 |
alpha |
Learning rate, α |
get_gradient |
Function that returns gradient, f(θj)=▽θj |
get_cost |
Function that returns cost, f(θj)=Jθj |
delta_min |
Minimum change in cost to establish convergence |
Throughout the variations we consider, the core algorithm remains the same. Both changes to the calculation of the learning rate, α, and later, the number of training samples used in calculating the gradient, are effected by modifying the function that gets passed in with respect to the get_gradient
argument.
The Styblinski–Tang function with respect to θ is:
J(θ)=∑ni=1θ4i−16θ2i+5θinwhere n is the number of dimensions in the data. For two dimensions, we can also express our cost function as:
J(θ)=θ41−16θ21+5θ1+θ42−16θ22+5θ22The global minimum of this function is −78.33233 at θ=(−2.903534,−2.903534)
The color scale of the surface corresponds to the z-axis value, which represents total cost, J, for all values of theta in the displayed range. The color scale of the points on the surface, which represent the total cost, Jθj, as function of θj at each iteration of the model, corresponds to the iteration.
The plots are also interactive, so you can spin them around and zoom in and out for more detail.
import numpy as np
import numpy as np
# Define general gradient descent algorithm
def gd(theta0, alpha, grad_obj, grad_adapt, delta_min, iters=1000):
# Initialize theta and cost history for convergence testing and plot
theta_hist = np.zeros((iters, theta0.shape[0]+1))
theta_hist[0] = grad_obj.cost(theta0)
# Create theta generator
theta_gen = grad_adapt(alpha, grad_obj.grad)(theta0)
# Initialize iteration variables
delta = float("inf")
i = 1
# Run algorithm
while delta > delta_min:
# Get next theta
theta = next(theta_gen)
# Test for convergence, store cost for plotting
try:
theta_hist[i] = grad_obj.cost(theta)
except:
print('{} minimum change in cost not achieved in {} iterations.'.format(delta_min, theta_hist.shape[0]))
break
delta = abs(theta_hist[i-1,-1] - theta_hist[i,-1])
i += 1
# Trim zeros
theta_hist = theta_hist[np.nonzero(theta_hist[:,-1])]
return theta_hist
# Define Gradient class used to pass gradient and cost generator functions to gd algorithm
class Gradient(object):
def __init__(self, grad_fun, cost_fun, data=np.array([]), size=50):
# Should return an array with the same dimensions as theta
self.grad_fun = grad_fun
# Should return an array with cost appended to theta
self.cost_fun = cost_fun
# Should have ones in first column if required
self.data = data
self.size = size
def batches_gen(data):
data = self.data
size = self.size
i = 0
while True:
index = slice(i*size, min((i+1)*size, data.shape[0]), 1)
if data.shape[0] - i * size > 0:
yield (data[index,:-1], data[index,-1])
i += 1
else:
np.random.shuffle(data)
i = 0
self.batches = batches_gen(self.data)
def grad_from_data(theta):
return self.grad_fun(theta, next(self.batches))
def cost_from_data(theta):
return self.cost_fun(theta, self.data)
if self.data.size > 1:
self.grad = grad_from_data
self.cost = cost_from_data
else:
self.grad = self.grad_fun
self.cost = self.cost_fun
# Define generator function for constant alpha
# Not terribly useful here, but allows other gradient adaptations
def const_alpha(alpha, grad_fun):
def theta_gen_const(theta):
while True:
theta = theta - alpha * grad_fun(theta)
yield theta
return theta_gen_const
# Define gradient and cost functions for testing (Stablyinski-Tang function)
def grad_fun_st(theta):
return np.apply_along_axis(lambda o: 2.5 - 16*o + 2*o**3, 0, theta)
def cost_fun_st(theta):
return np.append(theta, np.sum(5*theta - 16*theta**2 + theta**4) / theta.shape[0])
# Create Gradient instance for test function
stab_tang = Gradient(grad_fun_st, cost_fun_st)
# Initialize model hyperparameters
delta_min = 10**-6
alpha = 0.01
# Initialize starting value of theta
theta0 = np.array([-0.2, -4.4])
# Run algorithm
theta_hist = gd(theta0, alpha, stab_tang, const_alpha, delta_min)
import plotly.offline as py
py.init_notebook_mode()
import plotly.graph_objs as go
# Prepare plot
# Prepare surface
x = np.arange(-4.6, 4.6, 0.1)
y = np.arange(-4.6, 4.6, 0.1)
X, Y = np.meshgrid(x, y)
Z = 1/2.0 * (X**4 - 16*X**2 + 5*X + Y**4 - 16*Y**2 + 5*Y)
# Prepare surface contours
contour = dict(
show = True,
color = 'DodgerBlue', #'#0066FF',
highlightcolor = 'DeepSkyBlue',
highlightwidth = 1.5,
width = 1
)
# Add surface to plot
surface = go.Surface(
name = 'J surface',
x = X,
y = Y,
z = Z,
colorscale = 'Rainbow',
showlegend = False,
contours = dict(
y = contour,
x = contour,
z = dict(
show = False,
color = contour['color'],
highlightcolor = contour['highlightcolor'],
highlightwidth = contour['highlightwidth'],
width = contour['width']
)
)
)
# Add theta_hist to plot - set up as a function for future plots
def spec_theta_hist_trace(theta_hist):
theta_hist_trace = go.Scatter3d(
name = 'theta_hist',
x = theta_hist[:,0],
y = theta_hist[:,1],
z = theta_hist[:,2],
mode = 'markers',
showlegend = False,
marker = dict(
color = np.arange(theta_hist.shape[0]),
colorscale = 'Blackbody',
showscale = False,
size = "5"
)
)
return theta_hist_trace
# Specify layout options
layout = go.Layout(
title='Constant Alpha',
autosize=False,
width=700,
height=700,
scene=dict(
xaxis=dict(
title = 'theta1',
ticks = "outside",
dtick = 0.25,
showticklabels = False
),
yaxis=dict(
title = 'theta2',
ticks = "",
dtick = 0.25,
showticklabels = False
),
zaxis=dict(
title = 'J',
),
camera=dict(
up=dict(x=0, y=0, z=1),
center=dict(x=0, y=0, z=0),
eye=dict(x=0.25, y=1.25, z=1.15)
)
)
)
# Execute plot
fig = go.Figure(data=[surface, spec_theta_hist_trace(theta_hist)], layout=layout)
py.iplot(fig, filename='constant_alpha_gradient_descent')
You can see from this graph that the core gradient descent algorithm is functioning as expected. With a starting value of θ=(−0.3,4.6), it produces a final θ that minimizes total cost, J. If you spin the graph around (it's actually easiest to see looking at the bottom of the surface), you can see the steps in θ the algorithm produced. Visually they appear to be in the direction of the greatest reduction in cost. The steps are also bigger where the gradient is steeper and smaller where it is flatter. The rate of change looks like it changed four times on the way to the global minimum.
This algorithm's ability to solve for the global minimum of J depends on α and the starting value of θ0. You can try different values of α and θ0 to see how they affect the algorithm's ability to successfully solve for θ such that the global minimum cost, Jmin, is achieved. Even in the scenarios where the algorithm fails to minimize cost, it appears to be functioning correctly.
# Define gradient generator function for Adagrad
def adagrad(alpha, grad_fun):
def theta_gen_adagrad(theta):
# Initialize gradient history and hyperparameter epsilon
grad_hist = 0
epsilon = 10**-8
# Generate gradient
while True:
# Get gradient to adapt from gradient function
gradient = grad_fun(theta)
# Perform gradient adaptation
grad_hist += np.square(gradient)
theta = theta - alpha * gradient / (epsilon + np.sqrt(grad_hist))
yield theta
return theta_gen_adagrad
alpha = 0.1
# Run algorithm
theta_hist = gd(theta0, alpha, stab_tang, adagrad, delta_min)
# Execute plot
layout.title = 'Adaptive Gradient Descent'
fig = go.Figure(data=[surface, spec_theta_hist_trace(theta_hist)], layout=layout)
py.iplot(fig, filename='adagrad_gradient_descent')
# Define theta generator function for Adam
def adam(alpha, grad_fun):
# Create theta generator
def theta_gen_adam(theta):
# Initialize moment variables and hyperparameters
moment1 = np.zeros(theta.shape[0])
moment2 = np.zeros(theta.shape[0])
beta1 = 0.9
beta2 = 0.999
epsilon = 10**-8
t = 1
# Generate new theta
while True:
# Get gradient to adapt
gradient = grad_fun(theta)
# Update moment estimates
moment1 = beta1 * moment1 + (1 - beta1) * gradient
moment2 = beta2 * moment2 + (1 - beta2) * np.square(gradient)
moment1_hat = moment1 / (1 - beta1**t)
moment2_hat = moment2 / (1 - beta2**t)
# Yield adapted gradient
theta_new = theta - alpha * moment1_hat / (epsilon + np.sqrt(moment2_hat))
yield theta_new
t += 1
theta = theta_new
return theta_gen_adam
# Run algorithm
theta_hist = gd(theta0, alpha, stab_tang, adam, delta_min)
# Execute plot
layout.title = 'Adaptive Moment Estimation'
fig = go.Figure(data=[surface, spec_theta_hist_trace(theta_hist)], layout=layout)
py.iplot(fig, filename='adam_gradient_descent')
We pass gradient_gen and cost_gen to Gradient object.
We pass Gradient object to gd().
gd() calls Gradient.grad(theta) to get next gradient.
We want to pass data to grad_gen once and then be able to pass theta to it subsequently and have it return a gradient based on batches of the data we originally passed.
Plan would be to set up grad_gen(theta, data) and then have a grad_decorator
# Define gradient generator function
# batch is tuple of (X values, y values)
def grad_fun_linear(theta, batch):
X, y = batch
return X.T.dot(X.dot(theta) - y) / X.shape[0]
def cost_fun_linear(theta, data):
X = data[:,:-1]
y = data[:,-1]
return np.append(theta, np.sum(np.square(X.dot(theta) - y)) / (2 * X.shape[0]))
data = np.ones((100,3))
obj_theta = np.array([100, 10])
np.random.seed(4)
data[:,1:-1] = np.random.randint(101, size=(100,1))
data[:,-1] = data[:,:-1].dot(obj_theta) * np.random.uniform(0.85, 1.15, 100)
traces = []
plot_data = go.Scatter(
name = 'Training data',
x = data[:,1],
y = data[:,-1],
mode="markers"
)
traces.append(plot_data)
# data_raw = np.loadtxt(open("data/test_multi.txt","rb"),delimiter=",")
mu = np.mean(data[:,1:-1], axis=0)
sigma = np.std(data[:,1:-1], axis=0)
data[:,1:-1] = (data[:,1:-1] - mu) / sigma
linear = Gradient(grad_fun_linear, cost_fun_linear, data, 5)
theta0 = np.zeros(2)
alpha = .05
delta_min = 10**-3
# Run algorithm
theta_hist = gd(theta0, alpha, linear, const_alpha, delta_min)
print(theta_hist[-1])
x = np.array([0, 100])
x_norm = np.ones((2, 2))
x_norm[:,1] = (np.array([0, 100]) - mu) / sigma
y = x_norm.dot(theta_hist[-1,:-1])
plot_theta = go.Scatter(
name = 'Model',
x = x_norm[:,1],
y = y
)
layout = go.Layout(
title='Linear Regression'
)
traces.append(plot_theta)
fig = go.Figure(data=traces, layout=layout)
py.iplot(fig, filename='linear-data')
0.001 minimum change in cost not achieved in 1000 iterations. [ 614.85791208 264.89480007 1647.98649304]
import unittest
class TestGradient(unittest.TestCase):
def setUp(self):
# Set up test data and theta
self.test_data = np.ones((100,3))
for i in range(self.test_data.shape[0]):
self.test_data[i,1:] = np.array([i+1, i+1])
self.theta0 = np.zeros(2)
self.with_data = Gradient(grad_fun_linear, cost_fun_linear, self.test_data, 40)
self.with_data_100 = Gradient(grad_fun_linear, cost_fun_linear, self.test_data, 100)
self.with_data_110 = Gradient(grad_fun_linear, cost_fun_linear, self.test_data, 110)
self.test_batches = self.with_data.batches
self.test_batches_100 = self.with_data_100.batches
self.test_batches_110 = self.with_data_110.batches
self.without_data = Gradient(grad_fun_st, cost_fun_st)
def tearDown(self):
self.test_data = None
self.theta0 = None
self.with_data = None
self.with_data_100 = None
self.with_data_110 = None
self.without_data = None
# Test that batches return tuple of (m x n, m)
def test_shape(self):
batch = next(self.test_batches)
self.assertEqual(batch[0].shape[0], 40)
self.assertEqual(batch[0].shape[1], 2)
self.assertEqual(batch[1].shape[0], 40)
with self.assertRaises(IndexError):
batch[1].shape[1]
# Test that batches returns correctly sequenced batches with size > m
def test_sequence(self):
self.assertEqual(next(self.test_batches)[0][-1,-1], 40)
self.assertEqual(next(self.test_batches)[0][-1,-1], 80)
self.assertEqual(next(self.test_batches)[0][-1,-1], 100)
self.assertEqual(next(self.test_batches)[0].shape[0], 40)
self.assertNotEqual(next(self.test_batches)[0][-1, -1], 80)
# Test that batches returns correctly sequenced batches with size = m
def test_sequence_100(self):
self.assertEqual(next(self.test_batches_100)[0].shape[0], 100)
self.assertEqual(next(self.test_batches_100)[0].shape[0], 100)
# Test that batches returns correctly sequenced batches with size > m
def test_sequence_100(self):
self.assertEqual(next(self.test_batches_100)[0].shape[0], 100)
self.assertEqual(next(self.test_batches_100)[0].shape[0], 100)
# Test to make sure Gradient returns correct gradient from data
def test_with_data(self):
self.assertEqual(self.with_data.grad(self.theta0)[0],-20.5)
self.assertEqual(self.with_data.grad(self.theta0)[0],-60.5)
self.assertEqual(self.with_data.grad(self.theta0).size,2)
self.assertEqual(self.with_data.cost(self.theta0).size,3)
self.assertEqual(self.with_data.cost(self.theta0)[-1],1691.75)
# Test to make sure Gradient returns correct gradient without
def test_without_data(self):
theta1 = self.without_data.grad(self.theta0)
self.assertEqual(theta1[0], 2.5)
self.assertEqual(self.without_data.grad(theta1)[0], -6.25)
suite = unittest.TestLoader().loadTestsFromTestCase(TestGradient)
unittest.TextTestRunner(verbosity=2).run(suite)
test_sequence (__main__.TestGradient) ... ok test_sequence_100 (__main__.TestGradient) ... ok test_shape (__main__.TestGradient) ... ok test_with_data (__main__.TestGradient) ... ok test_without_data (__main__.TestGradient) ... ok ---------------------------------------------------------------------- Ran 5 tests in 0.005s OK
<unittest.runner.TextTestResult run=5 errors=0 failures=0>
With logistic regression, we have a depedent variable that is classified, meaning it can only assume a limited number of finite values. In order to account for this we modify our cost function such that our predictions will always range between 0 and 1.
hθ(x)=g(θTx)z=θTxg(z)=11+e−zdef sigmoid(z):
return 1 / (1 + np.exp(-z))
def grad_fun_logistic(theta, batch):
X, y = batch
return (sigmoid(X.dot(theta)) - y).T.dot(X) / X.shape[0]
def cost_fun_logistic(theta, data):
X = data[:,:-1]
y = data[:,-1]
sigm = sigmoid(X.dot(theta))
return np.append(theta, np.sum(-y.dot(np.log(sigm)) - (1 - y).dot(np.log(1 - sigm))) / data.shape[0])
data = np.ones((100,4))
data[:,1:] = np.loadtxt(open("data/test_logistic.txt","rb"),delimiter=",")
mu = np.mean(data[:,1:-1], axis=0)
sigma = np.std(data[:,1:-1], axis=0)
data[:,1:-1] = (data[:,1:-1] - mu) / sigma
logistic = Gradient(grad_fun_logistic, cost_fun_logistic, data, 100)
theta0 = np.zeros(3)
alpha = .05
delta_min = 10**-3
# Run algorithm
theta_hist = gd(theta0, alpha, logistic, const_alpha, delta_min)
print(theta_hist[-1,:-1])
traces = []
plot_data = go.Scatter(
name = 'Training data',
x = data[:,1],
y = data[:,2],
mode="markers",
marker=dict(
color=data[:,-1],
colorscale='Bluered',
)
)
traces.append(plot_data)
# Plot decision boundary
x1 = np.ones((2,2))
x1[:,-1] = np.array([-1.8, 1.8])
x2 = x1.dot((theta_hist[-1,:-2] / -theta_hist[-1,-2]))
plot_db = go.Scatter(
name = 'Decision boundary',
x = x1[:,-1],
y = x2,
mode="lines"
)
traces.append(plot_db)
layout = go.Layout(
title='Logistic Regression'
)
fig = go.Figure(data=traces, layout=layout)
py.iplot(fig, filename='logistic-regression')
[ 0.33398824 0.93569619 0.83717276]
class TestLogistic(unittest.TestCase):
def setUp(self):
# Set up test data and theta
self.data = np.ones((100,4))
self.data[:,1:] = np.loadtxt(open("data/test_logistic.txt","rb"),delimiter=",")
mu = np.mean(self.data[:,1:-1], axis=0)
sigma = np.std(self.data[:,1:-1], axis=0)
data[:,1:-1] = (self.data[:,1:-1] - mu) / sigma
self.logistic = Gradient(grad_fun_logistic, cost_fun_logistic, self.data, 5)
self.batches = self.logistic.batches
self.theta0 = np.zeros(3)
def tearDown(self):
self.data = None
self.logistic = None
self.batches = None
self.theta0 = None
def test_sigmoid(self):
sigm = sigmoid(next(self.batches)[0].dot(theta0))
self.assertEqual(sigm.shape[0], 5)
def test_grad_fun(self):
self.assertEqual(self.logistic.grad(self.theta0).shape[0], 3)
def test_cost_fun(self):
self.assertEqual(self.logistic.cost(self.theta0)[-1], 0.69314718055994518)
suite = unittest.TestLoader().loadTestsFromTestCase(TestLogistic)
unittest.TextTestRunner(verbosity=2).run(suite)
test_cost_fun (__main__.TestLogistic) ... ok test_grad_fun (__main__.TestLogistic) ... ok test_sigmoid (__main__.TestLogistic) ... ok ---------------------------------------------------------------------- Ran 3 tests in 0.005s OK
<unittest.runner.TextTestResult run=3 errors=0 failures=0>