#!/usr/bin/env python # coding: utf-8 # ### 随机梯度下降 # 梯度下降计算 Loss: # $$ # L ( W ) = \frac { 1 } { N } \sum _ { i = 1 } ^ { N } L _ { i } \left( x _ { i } , y _ { i } , W \right) # $$ # 梯度下降计算 Loss 关于权重的梯度 # $$ # \nabla _ { W } L ( W ) = \frac { 1 } { N } \sum _ { i = 1 } ^ { N } \nabla _ { W } L _ { i } \left( x _ { i } , y _ { i } , W \right) # $$ # 当 $N$ 非常大时,全批量计算是不可能的,没有这么大的内存和显存可以容纳。这时候使用一个 minibatch 来估计全部数据集,minibatch 通常为 32/64/128。 # SGD 示例: # ![](https://tuchuang-1252747889.cosgz.myqcloud.com/2018-11-29-1601E590-7AEE-4743-B77B-10C783A461A4.png) # 按批读取数据,计算梯度更新参数。 # 接下来模仿 Pytorch 实现一个 Dataloader,重写 `__getitem__`、`__iter__`、`__len__`,因此可以根据下标获取数据、迭代数据和获取数据长度。 # In[2]: class Dataloader(object): def __init__(self, data, labels, batch_size, shuffle=True): self.data = data self.batch_size = batch_size self.shuffle = shuffle self.labels = labels def __getitem__(self, index): return self.data[index], self.labels[index] def __iter__(self): datasize = self.data.shape[0] data_seq = np.arange(datasize) if self.shuffle: np.random.shuffle(data_seq) interval_list = np.append(np.arange(0, datasize, self.batch_size), datasize) for index in range(interval_list.shape[0]-1): s = data_seq[interval_list[index]:interval_list[index+1]] yield self.data[s], self.labels[s] def __len__(self): return self.data.shape[0] # PyTorch 中首先需要重写 [torchvision.datasets](https://pytorch.org/docs/stable/torchvision/datasets.html),然后使用 [torch.utils.data.DataLoader](https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader) 加载数据,支持并行加载和打乱数据。 # SGD 只是最简单的一个优化器,通常不会单独使用,SDG+Momentum 和 Adam 更为常用,几种优化器只是在参数的更新上有所差别。希望能整理一下各种优化器的知识。 # ### 线性层实现 # 在 [Softmax、K-L 散度、交叉熵和 Cross Entropy Loss 推导和实现](https://hzzone.io/3.%20cs231n/Softmax、KL%20散度和%20Cross%20Entropy%20Loss%20推导和实现.html) 推到了交叉熵损失函数,实现了一个线性分类器。 # # 其实线性层的基本算法就是简单的 $y=Wx+b$,这个线性分类器就是单层神经网络。 # # 因此对于参数 $W$ 和 $b$ 求导可得: # $$\frac{\partial y}{\partial W}=x^T$$ # $$\frac{\partial y}{\partial b}=1$$ # 然后再用链式法则乘以上一层的梯度即可。 # In[1]: import numpy as np class Linear(object): def __init__(self, D_in, D_out): self.weight = np.random.randn(D_in, D_out).astype(np.float32)*0.01 self.bias = np.zeros((1, D_out), dtype=np.float32) def forward(self, input): self.data = input return np.dot(self.data, self.weight)+self.bias def backward(self, top_grad, lr): self.grad = np.dot(top_grad, self.weight.T).astype(np.float32) # 更新参数 self.weight -= lr*np.dot(self.data.T, top_grad) self.bias -= lr*np.mean(top_grad, axis=0) # 接下来使用 SGD 重新训练一个单层神经网络分类 mnist。 # In[ ]: from utils import read_mnist from nn import CrossEntropyLossLayer, lr_scheduler # 读取并归一化数据,不归一化会导致 nan test_data = ((read_mnist('../data/mnist/t10k-images.idx3-ubyte').reshape((-1, 784))-127.0)/255.0).astype(np.float32) train_data = ((read_mnist('../data/mnist/train-images.idx3-ubyte').reshape((-1, 784))-127.0)/255.0).astype(np.float32) # 独热编码标签 from sklearn.preprocessing import OneHotEncoder encoder = OneHotEncoder() encoder.fit(np.arange(10).reshape((-1, 1))) train_labels = encoder.transform(read_mnist('../data/mnist/train-labels.idx1-ubyte').reshape((-1, 1))).toarray().astype(np.float32) test_labels = encoder.transform(read_mnist('../data/mnist/t10k-labels.idx1-ubyte').reshape((-1, 1))).toarray().astype(np.float32) loss_layer = CrossEntropyLossLayer() lr = 0.1 D, C = 784, 10 np.random.seed(1) # 固定随机生成的权重 best_acc = -float('inf') max_iter = 900 step_size = 400 scheduler = lr_scheduler(lr, step_size) loss_list = [] batch_size = 120 train_dataloader = Dataloader(train_data, train_labels, batch_size, shuffle=True) test_dataloader = Dataloader(test_data, test_labels, batch_size, shuffle=False) linear_classifer = Linear(D, C) from tqdm import tqdm_notebook for epoch in tqdm_notebook(range(max_iter)): # 测试 correct = 0 for data, labels in test_dataloader: test_pred = linear_classifer.forward(data) pred_labels = np.argmax(test_pred, axis=1) real_labels = np.argmax(labels, axis=1) correct += np.sum(pred_labels==real_labels) acc = correct/len(test_dataloader) if acc>best_acc: best_acc=acc # 训练 total_loss = 0 for data, labels in test_dataloader: train_pred = linear_classifer.forward(data) loss = loss_layer.forward(train_pred, labels) total_loss += loss loss_layer.backward() linear_classifer.backward(loss_layer.grad, scheduler.get_lr()) loss_list.append(total_loss) scheduler.step() # In[18]: best_acc # 绘制 Loss 曲线。 # In[16]: import matplotlib.pyplot as plt plt.plot(np.arange(max_iter), loss_list) plt.show() # 训练速度比全批量训练提升很多倍,且精度达到了 ~97%,应该是我做了打乱训练数据的原因,可能和使用了 SGD 有关。