Calculate uncertainty with dropout-based methods on one-dimensional source and target.
from typing import Callable, List, Optional, Tuple, Union
from glob import glob
import os
import random
import sys
import warnings
gpu_id = 1
os.environ["CUDA_VISIBLE_DEVICES"] = f'{gpu_id}'
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
Support in-notebook plotting
%matplotlib inline
Report versions
print('numpy version: {}'.format(np.__version__))
from matplotlib import __version__ as mplver
print('matplotlib version: {}'.format(mplver))
print(f'pytorch version: {torch.__version__}')
numpy version: 1.17.2 matplotlib version: 3.1.3 pytorch version: 1.5.0
pv = sys.version_info
print('python version: {}.{}.{}'.format(pv.major, pv.minor, pv.micro))
python version: 3.7.7
Reload packages where content for package development
%load_ext autoreload
%autoreload 2
Check GPU(s)
!nvidia-smi | head -n 4
Fri Jun 19 12:15:01 2020 +-----------------------------------------------------------------------------+ | NVIDIA-SMI 430.40 Driver Version: 430.40 CUDA Version: 10.1 | |-------------------------------+----------------------+----------------------+
Set seeds for better reproducibility
seed = 1337
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
Make up some toy data for our regression experiments, e.g., $y = \sum_{i=1}^D x_i^2 + \varepsilon$.
class ToyRegressionData(Dataset):
def __init__(self, n:int, dim:int, x_params:Tuple[float,float]=(0.,1.),
noise_std:float=0.1, dist:Optional=torch.distributions.Normal):
self.n = n
self.x_params = x_params
self.dim = dim
self.noise_std = noise_std
if dist is None:
self.x = torch.linspace(x_params[0], x_params[1], n).unsqueeze(1)
else:
self.x_dist = dist(*x_params)
self.x = self.x_dist.sample((n,dim))
y = torch.sum(self.x**2, axis=1, keepdim=True)
self.y = y + noise_std * torch.randn_like(y)
def __len__(self):
return self.x.size(0)
def __getitem__(self, idx):
return (self.x[idx], self.y[idx])
dim = 1
N = 2**10
train_dist = None #torch.distributions.Normal
valid_dist = None
xt_params, xv_params = (-1.,1.), (-1.1, 1.1)
train_std, valid_std = 0.1, 0.1
print(f'N = {N}')
train_ds_params = dict(x_params=xt_params, noise_std=train_std, dist=train_dist)
valid_ds_params = dict(x_params=xv_params, noise_std=valid_std, dist=valid_dist)
train_ds = ToyRegressionData(N, dim, **train_ds_params)
valid_ds = ToyRegressionData(N//2, dim, **valid_ds_params)
N = 1024
plot_data = N <= 2**12 and dim == 1
if plot_data:
ixs = np.argsort(train_ds.x[:,0])
plt.plot(train_ds.x[ixs],train_ds.y[ixs],lw=1)
plt.title('Training data');
activation = nn.ReLU
class UncertainLinear(nn.Module):
def __init__(self, dim:int, mid:int, n_layers:int=1, dropout_rate:float=0.2,
mid_bias:bool=True, out_bias:bool=False):
super().__init__()
self.p = dropout_rate
layers = [nn.Sequential(nn.Linear(dim, mid, bias=mid_bias), activation())]
for _ in range(n_layers-1):
layers.append(nn.Sequential(nn.Linear(mid,mid,bias=mid_bias), activation()))
self.m = nn.ModuleList(layers)
self.out = nn.Linear(mid, 1, bias=out_bias)
self.var = nn.Linear(mid, 1, bias=out_bias)
def forward(self, x:torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
for m in self.m:
x = F.dropout(m(x), p=self.p, training=True)
return self.out(x), self.var(x)
class MSELoss(nn.Module):
""" use just MSE loss with UncertainLinear network """
def forward(self, out:torch.Tensor, y:torch.Tensor) -> torch.Tensor:
yhat, _ = out
loss = F.mse_loss(yhat, y)
return loss
class L1Loss(nn.Module):
""" use just L1 loss with UncertainLinear network """
def forward(self, out:torch.Tensor, y:torch.Tensor) -> torch.Tensor:
yhat, _ = out
loss = F.smooth_l1_loss(yhat, y)
return loss
class ExtendedMSELoss(nn.Module):
""" use modified MSE loss for variance calculation with UncertainLinear network """
def forward(self, out:torch.Tensor, y:torch.Tensor) -> torch.Tensor:
yhat, s = out
loss = torch.mean(0.5 * (torch.exp(-s) * F.mse_loss(yhat, y, reduction='none') + s))
return loss
class ExtendedL1Loss(nn.Module):
""" use modified L1 loss for scale param. calculation with UncertainLinear network """
def forward(self, out:torch.Tensor, y:torch.Tensor) -> torch.Tensor:
yhat, s = out
loss = torch.mean((torch.exp(-s) * F.l1_loss(yhat, y, reduction='none')) + s)
return loss
def calc_uncertainty(yhat:torch.Tensor, s:torch.Tensor, l2:bool=True) -> Tuple[torch.Tensor, torch.Tensor]:
""" calculate epistemic and aleatory uncertainty quantities based on whether MSE or L1 loss used """
epistemic = torch.mean(yhat.var(dim=0, unbiased=True), dim=1, keepdim=True) # variance over samples, mean over channels
aleatory = torch.mean(torch.exp(s),dim=0) if l2 else torch.mean(2*torch.exp(s)**2,dim=0)
return epistemic, aleatory
def predict(model, x:torch.Tensor, n_samp:int=25, l2:bool=True):
""" return predictions of the model and the predicted model uncertainties """
out = [model.forward(x) for _ in range(n_samp)]
yhat = torch.stack([o[0] for o in out]).detach().cpu()
s = torch.stack([o[1] for o in out]).detach().cpu()
e, a = calc_uncertainty(yhat, s, l2)
return (torch.mean(yhat, dim=0), torch.mean(s, dim=0), e, a)
def get_metrics(model, x:torch.Tensor, y:torch.Tensor, n_samp:int, use_l2:bool, eps:float):
""" get uncertainties and other metrics during training for analysis """
state = model.training
model.eval()
with torch.no_grad():
out, s, ep, al = predict(model, x, n_samp, use_l2)
loss = criterion((out, s), y.detach().cpu())
sb = ep / (al + eps)
eu, au = np.mean(ep.numpy()), np.mean(al.numpy())
su = np.mean(sb.numpy())
model.train(state)
return loss, (out, ep, al, sb), eu, au, su
Set training and model hyperparameters
# system setup
use_jit = False
load_model = False
# logging setup
log_rate = 100 # print losses and uncertainty measures every log_rate epoch
unc_rate = 5 # get uncertainties in training every unc_rate iteration
version = 'toy_regression_v1' # naming scheme of model to load
# model, optimizer, loss, and training parameters
batch_size = 64
n_jobs = 0
n_epochs = 1000
n_samp = 5 # number of samples to take in validation
model_kwargs = dict(dim=dim, mid=1024, n_layers=1, dropout_rate=0.2, mid_bias=True, out_bias=False)
use_adam = True
optim_kwargs = dict(lr=0.001, betas=(0.8,0.99), weight_decay=1e-6) if use_adam else \
dict(lr=0.001, momentum=0., weight_decay=1e-6)
use_scheduler = False
scheduler_kwargs = dict(step_size=1000, gamma=0.5)
extended = True # use modified loss for variance estimation
use_l2 = True # use l2 (MSE) loss or l1 loss function
eps = 1e-6 # add this value to denominator in scibilic uncertainty calc
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,
num_workers=n_jobs, pin_memory=True)
valid_loader = DataLoader(valid_ds, batch_size=batch_size,
num_workers=n_jobs, pin_memory=True)
assert torch.cuda.is_available()
device = torch.device(f'cuda')
torch.backends.cudnn.benchmark = True
model = UncertainLinear(**model_kwargs)
if use_jit: model = torch.jit.script(model)
def num_params(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Number of trainable parameters in model: {num_params(model)}')
Number of trainable parameters in model: 4096
if load_model:
if use_jit:
model = torch.jit.load(f'trained_jit_{version}.pth')
else:
state_dict = torch.load(f'trained_{version}.pth')
model.load_state_dict(state_dict);
model.cuda(device=device)
optim_cls = torch.optim.AdamW if use_adam else torch.optim.SGD
optimizer = optim_cls(model.parameters(), **optim_kwargs)
if use_scheduler:
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, **scheduler_kwargs)
if extended:
criterion = ExtendedMSELoss() if use_l2 else ExtendedL1Loss()
else:
criterion = MSELoss() if use_l2 else L1Loss()
train_losses, valid_losses = [], []
t_ep_unc, t_al_unc, t_sb_unc = [], [], []
v_ep_unc, v_al_unc, v_sb_unc = [], [], []
n_batches = len(train_loader)
for t in range(1, n_epochs+1):
# training
t_losses, t_ep, t_al, t_sb = [], [], [], []
model.train()
for i, (x, y) in enumerate(train_loader):
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
out = model(x)
loss = criterion(out, y)
t_losses.append(loss.item())
loss.backward()
optimizer.step()
if i % unc_rate == 0:
_, _, ep, al, sb = get_metrics(model, x, y, n_samp, use_l2, eps)
t_ep.append(ep); t_al.append(al); t_sb.append(sb)
train_losses.append(t_losses)
t_ep_unc.append(t_ep); t_al_unc.append(t_al); t_sb_unc.append(t_sb)
# validation
v_losses, v_ep, v_al, v_sb = [], [], [], []
model.eval()
with torch.no_grad():
for i, (x, y) in enumerate(valid_loader):
x = x.to(device)
loss, out, ep, al, sb = get_metrics(model, x, y, n_samp, use_l2, eps)
v_losses.append(loss.item())
v_ep.append(ep); v_al.append(al); v_sb.append(sb)
valid_losses.append(v_losses)
v_ep_unc.append(v_ep); v_al_unc.append(v_al); v_sb_unc.append(v_sb)
if not np.all(np.isfinite(t_losses)):
raise RuntimeError('NaN or Inf in training loss, cannot recover. Exiting.')
if t % log_rate == 0:
log = (f'Epoch: {t} - TL: {np.mean(t_losses):.2e}, VL: {np.mean(v_losses):.2e}, '
f'tEU: {np.mean(t_ep):.2e}, vEU: {np.mean(v_ep):.2e}, '
f'tAU: {np.mean(t_al):.2e}, vAU: {np.mean(v_al):.2e}, '
f'tSU: {np.mean(t_sb):.2e}, vSU: {np.mean(v_sb):.2e}')
print(log)
if use_scheduler: scheduler.step()
Epoch: 100 - TL: -1.65e+00, VL: -1.74e+00, tEU: 6.24e-04, vEU: 7.98e-04, tAU: 1.38e-02, vAU: 1.14e-02, tSU: 4.38e-02, vSU: 6.45e-02 Epoch: 200 - TL: -1.69e+00, VL: -1.75e+00, tEU: 5.69e-04, vEU: 7.63e-04, tAU: 1.29e-02, vAU: 1.40e-02, tSU: 4.32e-02, vSU: 5.27e-02 Epoch: 300 - TL: -1.72e+00, VL: -1.79e+00, tEU: 4.98e-04, vEU: 7.43e-04, tAU: 1.24e-02, vAU: 1.33e-02, tSU: 3.81e-02, vSU: 5.15e-02 Epoch: 400 - TL: -1.71e+00, VL: -1.79e+00, tEU: 5.43e-04, vEU: 6.22e-04, tAU: 1.23e-02, vAU: 1.26e-02, tSU: 4.07e-02, vSU: 4.62e-02 Epoch: 500 - TL: -1.71e+00, VL: -1.76e+00, tEU: 4.20e-04, vEU: 5.78e-04, tAU: 1.20e-02, vAU: 1.26e-02, tSU: 3.32e-02, vSU: 4.38e-02 Epoch: 600 - TL: -1.71e+00, VL: -1.80e+00, tEU: 4.93e-04, vEU: 7.06e-04, tAU: 1.26e-02, vAU: 1.21e-02, tSU: 3.87e-02, vSU: 5.92e-02 Epoch: 700 - TL: -1.74e+00, VL: -1.79e+00, tEU: 4.12e-04, vEU: 5.92e-04, tAU: 1.16e-02, vAU: 1.18e-02, tSU: 3.51e-02, vSU: 5.14e-02 Epoch: 800 - TL: -1.74e+00, VL: -1.79e+00, tEU: 4.67e-04, vEU: 5.46e-04, tAU: 1.13e-02, vAU: 1.17e-02, tSU: 4.05e-02, vSU: 4.71e-02 Epoch: 900 - TL: -1.71e+00, VL: -1.77e+00, tEU: 3.80e-04, vEU: 5.85e-04, tAU: 1.25e-02, vAU: 1.24e-02, tSU: 2.94e-02, vSU: 4.54e-02 Epoch: 1000 - TL: -1.74e+00, VL: -1.80e+00, tEU: 3.49e-04, vEU: 5.72e-04, tAU: 1.19e-02, vAU: 1.11e-02, tSU: 2.86e-02, vSU: 5.42e-02
Create tidy datasets from training and validation losses for plotting purposes.
def tidy_losses(train:List[List[float]], valid:List[List[float]]) -> pd.DataFrame:
out = {'epoch': [], 'type': [], 'value': [], 'phase': []}
for i, (tl,vl) in enumerate(zip(train,valid),1):
for tli in tl:
out['epoch'].append(i)
out['type'].append('loss')
out['value'].append(tli)
out['phase'].append('train')
for vli in vl:
out['epoch'].append(i)
out['type'].append('loss')
out['value'].append(vli)
out['phase'].append('valid')
return pd.DataFrame(out)
def tidy_uncertainty(ep:List[List[float]], al:List[List[float]], sb:List[List[float]]) -> pd.DataFrame:
out = {'epoch': [], 'type': [], 'value': [], 'phase': []}
for i, (epi, ali, sbi) in enumerate(zip(ep, al, sb)):
phase = 'train' if i == 0 else 'valid'
for j, (epij,alij,sbij) in enumerate(zip(epi,ali,sbi),1):
for epijk in epij:
out['epoch'].append(j)
out['type'].append('epistemic')
out['value'].append(epijk)
out['phase'].append(phase)
for alijk in alij:
out['epoch'].append(j)
out['type'].append('aleatory')
out['value'].append(alijk)
out['phase'].append(phase)
for sbijk in sbij:
out['epoch'].append(j)
out['type'].append('scibilic')
out['value'].append(sbijk)
out['phase'].append(phase)
return pd.DataFrame(out)
losses = tidy_losses(train_losses, valid_losses)
uncert = tidy_uncertainty((t_ep_unc, v_ep_unc),
(t_al_unc, v_al_unc),
(t_sb_unc, v_sb_unc))
version = version # naming scheme of model to save
load_tidy = False
if load_tidy:
losses = pd.read_csv(f'losses_{version}.csv')
uncert = pd.read_csv(f'uncert_{version}.csv')
f, ax1 = plt.subplots(1,1,figsize=(12, 8),sharey=True)
sns.lineplot(x='epoch',y='value',hue='phase',data=losses,ci='sd',ax=ax1,lw=3);
ax1.set_title('Losses');
ax1.set_ylim((-2.,-0.5))
f.savefig(f'losses_{version}.pdf')
The output of the validation is averaged over 5 samples before the loss is computed which is why it is lower than the training loss (which is not averaged).
use_log = False
save_plots = False
f, ax1 = plt.subplots(1,1,figsize=(12,8),sharey=True)
if use_log: ax1.set(yscale='log')
sns.lineplot(x='epoch',y='value',hue='type',style='phase',ci='sd',data=uncert,ax=ax1,lw=3);
ax1.set_title('Uncertainty');
ax1.set_ylim((0, 0.2))
if save_plots: f.savefig(f'uncert_{version}.pdf')
save_tidy = False
if save_tidy:
losses.to_csv(f'losses_{version}.csv')
uncert.to_csv(f'uncert_{version}.csv')
save_model = False
if save_model:
if use_jit:
torch.jit.save(model, f'trained_jit_{version}.pth')
else:
torch.save(model.state_dict(), f'trained_{version}.pth')
xt, yt = train_ds.x, train_ds.y
xv, yv = valid_ds.x, valid_ds.y
xtn, ytn = xt.numpy(), yt.numpy()
xvn, yvn = xv.numpy(), yv.numpy()
def list_to_np(lst:List[torch.Tensor]) -> List[np.ndarray]:
return list(map(lambda x: x.detach().cpu().numpy(), lst))
model.eval()
with torch.no_grad():
yhattn, stn, etn, atn = list_to_np(predict(model, xt.to(device), 50, use_l2))
yhatvn, svn, evn, avn = list_to_np(predict(model, xv.to(device), 50, use_l2))
tss, vss = 1, 1
xtns, xvns = xtn[::tss,0], xvn[::vss,0]
xr = np.abs(train_ds.x_params[1])
xtm = (xtns > -xr) & (xtns < xr)
xvm = (xvns > -xr) & (xvns < xr)
if dim == 2:
from mpl_toolkits.mplot3d import Axes3D
ax1 = fig.add_subplot(221, projection='3d')
ax1.scatter(xtn[::ss,0], xtn[::ss,1], ytn[::ss,0]);
ax2 = fig.add_subplot(222, projection='3d')
ax2.scatter(xvn[::ss,0], xvn[::ss,1], yvn[::ss,0]);
ax3 = fig.add_subplot(223, projection='3d')
ax3.scatter(xvn[::ss,0], xvn[::ss,1], yvn[::ss,0]);
else:
fig,(ax1,ax2,ax3,ax4) = plt.subplots(1,4,figsize=(15,5),sharex=True)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sns.regplot(xtns[xtm], ytn[::tss,0][xtm], order=2, ax=ax1)
sns.regplot(xvns[xvm], yvn[::vss,0][xvm], order=2, ax=ax2)
sns.regplot(xtns[xtm], yhattn[::tss,0][xtm], order=2, ax=ax3)
sns.regplot(xvns[xvm], yhatvn[::vss,0][xvm], order=2, ax=ax4)
ax1.set_title('Training data');
ax2.set_title('Validation data');
ax3.set_title('Predicted training');
ax4.set_title('Predicted validation');
ax1.set_xlim((-xr,xr));
for ax in (ax1,ax2,ax3,ax4): ax.set_ylim((-0.1,xr**2))
if plot_data:
fig, (ax1,ax2) = plt.subplots(1,2,figsize=(16,8),sharey=True)
x_range = (-1.5, 1.5)
xs = torch.linspace(*x_range, 100).unsqueeze(1)
with torch.no_grad():
ys, _, ep, al = list_to_np(predict(model, xs.to(device), 1000, use_l2))
x, y, ep, al = xs[:,0], ys[:,0], np.sqrt(ep[:,0]), np.sqrt(al[:,0])
pal = sns.color_palette()
ax1.plot(train_ds.x[ixs],train_ds.y[ixs],alpha=0.5,color=pal[1],label='Orig.')
ax1.plot(x,y,label='Fit',lw=3,color=pal[7])
ax1.fill_between(x, y-al, y+al,alpha=0.5,zorder=3,color=pal[0], label='Al.')
ax1.legend(loc='best');
ax1.set_title('Aleatory');
ax2.plot(train_ds.x[ixs],train_ds.y[ixs],alpha=0.5,color=pal[1],label='Orig.')
ax2.plot(x,y,label='Fit',lw=3,color=pal[7])
ax2.fill_between(x, y-ep, y+ep,alpha=0.5,zorder=3,color=pal[4],label='Ep.')
ax2.legend(loc='best');
ax2.set_ylim(-0.5,2.5)
ax2.set_title('Epistemic');
if plot_data:
sc = ep / (al + eps) # scibilic uncertainty
fig, ax3 = plt.subplots(1,1,figsize=(8,8))
ax3.plot(train_ds.x[ixs],train_ds.y[ixs],alpha=0.5,color=pal[1],label='Orig.')
ax3.plot(x,y,label='Fit',lw=3,color=pal[7])
ax3.fill_between(x, y-sc, y+sc,alpha=0.3,zorder=3,color=pal[2],label='Sc.')
ax3.legend(loc='best');
ax3.set_ylim(-0.5,2.5)
ax3.set_title('Scibilic');
We can see the epistemic (Ep.) and aleatory (Al.) uncertainty plotted over the fit function. As desired, we see that epistemic uncertainty increases when out-of-distribution data is input to the model (i.e., outside the range of -1 to 1). Recall that epistemic uncertainty is related to what the model knows or doesn't know about the true data-generating process, so the fact that it is low inside the range of the in-distribution data is expected and the fact that it is relatively high on out-of-distribution data is also expected.
Aleatory is (approximately) constant throughout the in-distribution data which is expected since the target data had a constant variance and the aleatory uncertainty estimates this variance term. The estimated aleatory uncertainty decreases below -1 but this is just because the network is poorly estimating the variance of out-of-distribution data, which isn't weird.
Scibilic uncertainty (Sc. in the above stand-alone plot) is just the ratio of epistemic over aleatory. It is supposed to show what is knowable by the model given sufficient representative training data, but the model doesn't know. This quantity is defined and explored more in this paper, where we use this quantity to find anomalies. We can see in the plot that scibilic uncertainty is large in out-of-distribution data, so—as shown in the second plot below—we can threshold that quantity to detect out-of-distribution samples. Admittedly, in this case, you can also do this with epistemic uncertainty (shown in the first plot below), but in non-trivial cases epistemic uncertainty is confounded by other factors.
fig, ax1 = plt.subplots(1,1,figsize=(8,8))
ax1.plot(train_ds.x[ixs],train_ds.y[ixs],color=pal[1],label='Orig.')
ax1.fill_between(x, 0., ep > 0.04,alpha=0.5,zorder=3,color=pal[4],label='Thresholded Ep.')
ax1.legend(loc='best');
ax1.set_xlim(-1.5,1.5);
ax1.set_ylim(-0,1.1);
fig, ax2 = plt.subplots(1,1,figsize=(8,8))
ax2.plot(train_ds.x[ixs],train_ds.y[ixs],color=pal[1],label='Orig.')
ax2.fill_between(x, 0., sc > 0.4,alpha=0.75,zorder=3,color=pal[2],label='Thresholded Sc.')
ax2.legend(loc='best');
ax2.set_xlim(-1.5,1.5);
ax2.set_ylim(-0,1.1);