#!/usr/bin/env python # coding: utf-8 # # DS4420: Fun with autoencoders and self-supervision # *Your name*: # In[179]: import numpy as np import matplotlib.pyplot as plt import torch from torch import nn # conda install -c pytorch torchvision import torchvision # note: if you cannot get torchvision installed # using the above sequence, you can resort to # the colab version here: # -- just be sure to download and then upload # the notebook to blackboard when complete. fMNIST = torchvision.datasets.FashionMNIST( root = './data/FashionMNIST', train = True, download = True) # Once again, we are playing with Fashion-MNIST here, following the last few lectures. # In[180]: from IPython.display import Image from matplotlib.pyplot import imshow get_ipython().run_line_magic('matplotlib', 'inline') imshow(np.asarray(fMNIST.data[6]), cmap='gray') # In[181]: X = fMNIST.data X = np.array([x_i.flatten().numpy() for x_i in X]) X = X / 255 # normalize X.shape # ## A brief detour / torch intro (or refresher) # # We're going to implement a few autoencoder (AE) variants in `torch`. # # Given that for some of you this may serve as something of an introduction to (or at least refresher for) `torch`, Here is one way to define and train a simple model. # # Note that you can also use the simple `Sequential` pipeline to build such straightforward models, but this style affords more flexibility (though overkill for something like this). # In[182]: class SimpleMLP(nn.Module): def __init__(self, input_size=784, hidden_size=32, n_labels=10): ''' In the initializer we setup model parameters/layers. ''' super(SimpleMLP, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.n_labels = 10 # input layer; from x -> z self.i = nn.Linear(self.input_size, self.hidden_size, bias=False) # nonlinear activation self.a = nn.ReLU() # output layer self.o = nn.Linear(self.hidden_size, 10) self.sm = nn.Softmax() def forward(self, X): ''' The forward pass defines how inputs flow forward through the model (linking layers together). ''' z = self.i(X) z = self.a(z) y_hat = self.o(z) return y_hat # Now to actually train the model, we need to define an `optimizer` and a loss function. # In[183]: model = SimpleMLP().float() from torch import optim optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) loss_function = nn.CrossEntropyLoss() # In[184]: y = fMNIST.targets y # In[185]: # convert X to a torch tensor X = torch.tensor(X) # Let's take a look at making predictions and calculating a loss. # In[186]: # make a prediction for the first 5 instances # (note that this is "batched"; we are pushing # through 5 instances at once) y_hat = model(X[:5,:].float()) # calculate loss loss = loss_function(y_hat, y[:5]) print(loss) # And now take some number of passes over our training data, incurring loss, and performing backprop. # In[187]: EPOCHS = 100 for epoch in range(EPOCHS): running_loss = 0.0 idx, batch_num = 0, 0 batch_size = 16 print("") while idx < 20000: # zero the parameter gradients optimizer.zero_grad() X_batch = X[idx: idx + batch_size].float() y_batch = y[idx: idx + batch_size] idx += batch_size # now run our X's forward, get preds, incur # loss, backprop, and step the optimizer. y_hat_batch = model(X_batch) loss = loss_function(y_hat_batch, y_batch) loss.backward() optimizer.step() # print statistics running_loss += loss.item() if batch_num % 100 == 0: print("epoch: {}, batch: {} // loss: {:.3f}".format(epoch, batch_num, loss.item())) batch_num += 1 # ## OK! Now let's come back to auto-encoders # ### TODO 1 # # Implement a simple autoencoder in `torch`. In particular, let's start with a vanilla linear auto-encoder, mapping to two dimensions in the hidden space. # In[192]: class AE(nn.Module): def __init__(self, input_size=784, hidden_size=2): ''' In the initializer we setup model parameters/layers. ''' super(AE, self).__init__() ### REMOVE BELOW self.input_size = input_size self.hidden_size = hidden_size # input layer; from x -> z self.i = nn.Linear(self.input_size, self.hidden_size) # output layer self.o = nn.Linear(self.hidden_size, self.input_size) def forward(self, X, return_z=False): ### REMOVE BELOW z = self.i(X) if return_z: return z return self.o(z) # In[193]: auto = AE(hidden_size=50) X_tilde = auto(X[:5,:].float()) X_tilde.shape # ### TODO 2 # # Define a training loop -- follow the above example. # # **hint** check out: https://pytorch.org/docs/stable/nn.html#loss-functions # In[194]: def train_AE(X_in, X_target, model, optimizer, loss_function, EPOCHS=10): for epoch in range(EPOCHS): idx, batch_num = 0, 0 batch_size = 16 print("") while idx < 60000: # zero the parameter gradients optimizer.zero_grad() X_batch = X_in[idx: idx + batch_size].float() X_target_batch = X_target[idx: idx + batch_size].float() idx += batch_size # now run our X's forward, get preds, incur # loss, backprop, and step the optimizer. X_tilde_batch = model(X_batch) loss = loss_function(X_tilde_batch, X_target_batch) loss.backward() optimizer.step() # print out loss if batch_num % 100 == 0: print("epoch: {}, batch: {} // loss: {:.3f}".format(epoch, batch_num, loss.item())) batch_num += 1 # In[195]: loss_function = nn.MSELoss() auto = AE() optimizer = optim.SGD(auto.parameters(), lr=0.01, momentum=0.9) train_AE(X, X, auto, optimizer, loss_function, EPOCHS=50) # In[196]: X_tilde = auto(X[:5000].float()).detach().numpy() # In[197]: imshow(np.asarray(X[3]).reshape(28,28), cmap='gray') # In[198]: imshow(np.asarray(X_tilde[2]).reshape(28,28), cmap='gray') # ### Pull out the hidden representations (for first 5k points) and plot them. # In[199]: Zs = auto(X[:5000].float(), return_z=True).detach().numpy() # In[200]: colors = ['r', 'g', 'b', 'c', 'm', 'y', 'k', 'w', 'orange', 'purple'] c = [colors[y_i] for y_i in y[:5000]] # In[201]: plt.scatter(Zs[:,0], Zs[:,1], c=c) # ### Let's implement `AE2` extending the above by adding a non-linear activation function (try `Sigmoid`). # In[202]: class AE2(nn.Module): def __init__(self, input_size=784, hidden_size=16): ''' In the initializer we setup model parameters/layers. ''' super(AE2, self).__init__() self.input_size = input_size self.hidden_size = hidden_size # input layer; from x -> z self.i = nn.Linear(self.input_size, self.hidden_size) self.a = nn.Sigmoid() # output layer self.o = nn.Linear(self.hidden_size, self.input_size) def forward(self, X, return_z=False): z = self.a(self.i(X)) if return_z: return z return self.o(z) # In[203]: auto2 = AE2() optimizer = optim.SGD(auto2.parameters(), lr=0.001, momentum=0.9) # In[204]: train_AE(X, X, auto2, optimizer, loss_function) # In[205]: Zs = auto2(X[:5000].float(), return_z=True).detach().numpy() # In[206]: plt.scatter(Zs[:,0], Zs[:,1], c=c) # In[207]: Zs.shape # ### TODO 3 # # *Denoising* auto-encoder. Now let's take as our target *corrupted* versions of the inputs. To create a corrupt version we will perturb the input pixel values by some random noise. # In[208]: def corrupt(x, var=0.01): return x + np.random.normal(np.zeros(x.shape), var) # In[209]: X[0,:10] # In[210]: corrupt(X[0])[:10] # In[211]: imshow(np.asarray(X[0].reshape((28,28))), cmap='gray') # In[212]: imshow(np.asarray(corrupt(X[0], var=0.1).reshape((28,28))), cmap='gray') # In[213]: X_corrupt = corrupt(X) # In[214]: auto3 = AE2(hidden_size=16) optimizer = optim.SGD(auto3.parameters(), lr=0.01, momentum=0.9) train_AE(X_corrupt, X, auto3, optimizer, loss_function) # In[215]: Zs = auto3(X[:5000].float(), return_z=True).detach().numpy() plt.scatter(Zs[:,0], Zs[:,1], c=c) # In[216]: X_tilde = auto3(X[:5000].float()).detach().numpy() # In[222]: imshow(np.asarray(X_tilde[2]).reshape(28,28), cmap='gray') # ### Finally, let's add a regularization penalty on the hidden layer # First, let's define the model appropriately. Consider: What do we need to change from above variants? Think about the training loop (below.) # In[223]: class AE_regularized(nn.Module): def __init__(self, input_size=784, hidden_size=2): ''' In the initializer we setup model parameters/layers. ''' super(AE_regularized, self).__init__() self.input_size = input_size self.hidden_size = hidden_size # input layer; from x -> z self.i = nn.Linear(self.input_size, self.hidden_size) self.a = nn.Sigmoid() # output layer self.o = nn.Linear(self.hidden_size, self.input_size) def forward(self, X): z = self.a(self.i(X)) # Now we always return z along with the output return self.o(z), z # Now update the training loop to incorporate regularization. This will take a parameter `lambda_` that encodes how much weight to put on the regularization penalty (vs typical/reconstruction loss). # # Two hints: # # (1) Consider that we want to incur a loss associated with our regularization (an l1 norm); where should we do that? # # (2) See `torch.norm` (https://pytorch.org/docs/stable/torch.html#torch.norm). # In[228]: def train_regularized_AE(X_in, X_target, model, optimizer, loss_function, lambda_, EPOCHS=10): for epoch in range(EPOCHS): idx, batch_num = 0, 0 batch_size = 16 print("") while idx < 60000: # zero the parameter gradients optimizer.zero_grad() X_batch = X_in[idx: idx + batch_size].float() X_target_batch = X_target[idx: idx + batch_size].float() idx += batch_size # now run our X's forward, get preds, incur # loss, backprop, and step the optimizer. X_tilde_batch, z = model(X_batch) output_loss = loss_function(X_tilde_batch, X_target_batch) # here is the regularization loss. reg_loss = torch.norm(z, 1) loss = output_loss + lambda_ * reg_loss loss.backward() optimizer.step() # print out loss if batch_num % 100 == 0: print("epoch: {}, batch: {} // loss: {:.3f} // reg. loss (* \lambda): {:.3f}".format( epoch, batch_num, output_loss.item(), lambda_ * reg_loss.item())) batch_num += 1 # In[229]: AER = AE_regularized(hidden_size=16) optimizer = optim.SGD(AER.parameters(), lr=0.01, momentum=0.9) train_regularized_AE(X_corrupt, X, AER, optimizer, loss_function, 1) # ### Variational auto-encoders # # First, let's review on board... # In[176]: from torch.nn import functional as F class VAE(nn.Module): def __init__(self, input_size=784, hidden_size1=32, hidden_size2=32): ''' In the initializer we setup model parameters/layers. ''' super(VAE, self).__init__() ### encoder layers self.fc_e = nn.Linear(784, hidden_size1) self.fc_mean = nn.Linear(hidden_size1, hidden_size2) self.fc_logvar = nn.Linear(hidden_size1, hidden_size2) ### decoder layers self.fc_d1 = nn.Linear(hidden_size2, hidden_size1) self.fc_d2 = nn.Linear(hidden_size1, 784) def encoder(self, x_in): x = self.fc_e(x_in) mean = self.fc_mean(x) logvar = self.fc_logvar(x) return mean, logvar def decoder(self, z): z = F.relu(self.fc_d1(z)) x_out = F.sigmoid(self.fc_d2(z)) #return x_out.view(-1,1,28,28) return x_out def sample_normal(self, mean, logvar): # Using torch.normal(means,sds) returns a stochastic tensor which we cannot backpropogate through. # Instead we utilize the 'reparameterization trick'. # http://stats.stackexchange.com/a/205336 # http://dpkingma.com/wordpress/wp-content/uploads/2015/12/talk_nips_workshop_2015.pdf sd = torch.exp(logvar*0.5) e = torch.tensor((torch.randn(sd.size()))) # Sample from standard normal z = e.mul(sd).add_(mean) return z def forward(self, x_in): z_mean, z_logvar = self.encoder(x_in) z = self.sample_normal(z_mean, z_logvar) x_out = self.decoder(z) return x_out, z_mean, z_logvar # In[177]: def train_VAE(X_in, X_target, model, optimizer, loss_function, EPOCHS=10): for epoch in range(EPOCHS): idx, batch_num = 0, 0 batch_size = 16 print("") while idx < 60000: # zero the parameter gradients optimizer.zero_grad() X_batch = X_in[idx: idx + batch_size].float() X_target_batch = X_target[idx: idx + batch_size].float() idx += batch_size # now run our X's forward, get preds, incur # loss, backprop, and step the optimizer. X_tilde_batch, _, _ = model(X_batch) loss = loss_function(X_tilde_batch, X_target_batch) loss.backward() optimizer.step() # print out loss if batch_num % 100 == 0: print("epoch: {}, batch: {} // loss: {:.3f}".format(epoch, batch_num, loss.item())) batch_num += 1 # In[178]: m = VAE() optimizer = optim.SGD(m.parameters(), lr=0.01, momentum=0.9) train_VAE(X, X, m, optimizer, loss_function, EPOCHS=20) # In[169]: samples = [m(X[2,:].float())[0].detach().numpy() for _ in range(5)] # In[174]: imshow(np.asarray(samples[4]).reshape(28,28), cmap='gray') # In[157]: X[0,:].shape # In[156]: X.shape # In[ ]: