%reload_ext autoreload
%autoreload 2
#export
from nb_007a import *
Data has been prepared in csv files at the beginning 007a, we will use it know.
PATH = Path('../data/aclImdb/')
CLAS_PATH = PATH/'clas'
LM_PATH = PATH/'lm'
MODEL_PATH = LM_PATH/'models'
os.makedirs(CLAS_PATH, exist_ok=True)
os.makedirs(LM_PATH, exist_ok=True)
os.makedirs(MODEL_PATH, exist_ok=True)
tokenizer = Tokenizer(rules=default_rules, special_cases=[BOS, FLD, UNK, PAD])
bs,bptt = 50,70
data = data_from_textcsv(LM_PATH, tokenizer, data_func=lm_data, max_vocab=60000, bs=bs, bptt=bptt)
Download the pretrained model and the corresponding itos dictionary here and put them in the MODEL_PATH folder.
itos_wt = pickle.load(open(MODEL_PATH/'itos_wt103.pkl', 'rb'))
stoi_wt = {v:k for k,v in enumerate(itos_wt)}
#export
Weights = Dict[str,Tensor]
def convert_weights(wgts:Weights, stoi_wgts:Dict[str,int], itos_new:Collection[str]) -> Weights:
"Converts the model weights to go with a new vocabulary."
dec_bias, enc_wgts = wgts['1.decoder.bias'], wgts['0.encoder.weight']
bias_m, wgts_m = dec_bias.mean(0), enc_wgts.mean(0)
new_w = enc_wgts.new_zeros((len(itos_new),enc_wgts.size(1))).zero_()
new_b = dec_bias.new_zeros((len(itos_new),)).zero_()
for i,w in enumerate(itos_new):
r = stoi_wgts[w] if w in stoi_wgts else -1
new_w[i] = enc_wgts[r] if r>=0 else wgts_m
new_b[i] = dec_bias[r] if r>=0 else bias_m
wgts['0.encoder.weight'] = new_w
wgts['0.encoder_dp.emb.weight'] = new_w.clone()
wgts['1.decoder.weight'] = new_w.clone()
wgts['1.decoder.bias'] = new_b
return wgts
wgts = torch.load(MODEL_PATH/'lstm_wt103.pth', map_location=lambda storage, loc: storage)
wgts['1.decoder.bias'][:10]
itos_wt[:10]
wgts = convert_weights(wgts, stoi_wt, data.train_ds.vocab.itos)
wgts['1.decoder.bias'][:10]
data.train_ds.vocab.itos[:10]
#export
def lm_split(model:Model) -> List[Model]:
"Splits a RNN model in groups for differential learning rates."
groups = [nn.Sequential(rnn, dp) for rnn, dp in zip(model[0].rnns, model[0].hidden_dps)]
groups.append(nn.Sequential(model[0].encoder, model[0].encoder_dp, model[1]))
return groups
SplitFunc = Callable[[Model], List[Model]]
OptSplitFunc = Optional[SplitFunc]
OptStrTuple = Optional[Tuple[str,str]]
class RNNLearner(Learner):
"Basic class for a Learner in RNN"
def __init__(self, data:DataBunch, model:Model, bptt:int=70, split_func:OptSplitFunc=None, clip:float=None,
adjust:bool=False, alpha:float=2., beta:float=1., **kwargs):
super().__init__(data, model)
self.callbacks.append(RNNTrainer(self, bptt, alpha=alpha, beta=beta, adjust=adjust))
if clip: self.callback_fns.append(partial(GradientClipping, clip=clip))
if split_func: self.split(split_func)
self.metrics = [accuracy]
def save_encoder(self, name:str):
"Saves the encoder to the model directory"
torch.save(self.model[0].state_dict(), self.path/self.model_dir/f'{name}.pth')
def load_encoder(self, name:srt):
"Loads the encoder from the model directory"
self.model[0].load_state_dict(torch.load(self.path/self.model_dir/f'{name}.pth'))
def load_pretrained(self, wgts_fname:str, itos_fname:str):
"Loads a pretrained model and adapts it to the data vocabulary."
old_itos = pickle.load(open(self.path/self.model_dir/f'{itos_fname}.pkl', 'rb'))
old_stoi = {v:k for k,v in enumerate(old_itos)}
wgts = torch.load(self.path/self.model_dir/f'{wgts_fname}.pth', map_location=lambda storage, loc: storage)
wgts = convert_weights(wgts, old_stoi, self.data.train_ds.vocab.itos)
self.model.load_state_dict(wgts)
@classmethod
def language_model(cls, data:DataBunch, bptt:int=70, emb_sz:int=400, nh:int=1150, nl:int=3, pad_token:int=1,
drop_mult:float=1., tie_weights:bool=True, bias:bool=True, qrnn:bool=False,
pretrained_fnames:OptStrTuple=None, **kwargs) -> 'RNNLearner':
"Creates a `Learner` with a language model."
dps = np.array([0.25, 0.1, 0.2, 0.02, 0.15]) * drop_mult
vocab_size = len(data.train_ds.vocab.itos)
model = get_language_model(vocab_size, emb_sz, nh, nl, pad_token, input_p=dps[0], output_p=dps[1],
weight_p=dps[2], embed_p=dps[3], hidden_p=dps[4], tie_weights=tie_weights, bias=bias, qrnn=qrnn)
learn = cls(data, model, bptt, split_func=lm_split, **kwargs)
if pretrained_fnames is not None: learn.load_pretrained(*pretrained_fnames)
return learn
data = data_from_textcsv(LM_PATH, Tokenizer(), data_func=lm_data, bs=bs)
learn = RNNLearner.language_model(data, drop_mul=0.3, pretrained_fnames=['lstm_wt103', 'itos_wt103'])
learn.freeze()
lr_find(learn)
learn.recorder.plot()
learn.fit_one_cycle(1, 1e-2, moms=(0.8,0.7), wd=0.03)
learn.save('fit_head')
learn.load('fit_head')
learn.unfreeze()
learn.fit_one_cycle(10, 1e-3, moms=(0.8,0.7), wd=0.03 pct_start=0.25)
learn.save('fine_tuned60kb')
learn.save_encoder('fine_tuned_enc60kb')
#export
from torch.utils.data import Sampler, BatchSampler
NPArrayList = Collection[np.ndarray]
KeyFunc = Callable[[int], int]
class SortSampler(Sampler):
"Go through the text data by order of length"
def __init__(self, data_source:NPArrayList, key:KeyFunc): self.data_source,self.key = data_source,key
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
return iter(sorted(range(len(self.data_source)), key=self.key, reverse=True))
class SortishSampler(Sampler):
"Go through the text data by order of length with a bit of randomness"
def __init__(self, data_source:NPArrayList, key:KeyFunc, bs:int):
self.data_source,self.key,self.bs = data_source,key,bs
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
idxs = np.random.permutation(len(self.data_source))
sz = self.bs*50
ck_idx = [idxs[i:i+sz] for i in range(0, len(idxs), sz)]
sort_idx = np.concatenate([sorted(s, key=self.key, reverse=True) for s in ck_idx])
sz = self.bs
ck_idx = [sort_idx[i:i+sz] for i in range(0, len(sort_idx), sz)]
max_ck = np.argmax([self.key(ck[0]) for ck in ck_idx]) # find the chunk with the largest key,
ck_idx[0],ck_idx[max_ck] = ck_idx[max_ck],ck_idx[0] # then make sure it goes first.
sort_idx = np.concatenate(np.random.permutation(ck_idx[1:]))
sort_idx = np.concatenate((ck_idx[0], sort_idx))
return iter(sort_idx)
#export
BatchSamples = Collection[Tuple[Collection[int], int]]
def pad_collate(samples:BatchSamples, pad_idx:int=1, pad_first:bool=True) -> Tuple[LongTensor, LongTensor]:
"Function that collect samples and adds padding"
max_len = max([len(s[0]) for s in samples])
res = torch.zeros(max_len, len(samples)).long() + pad_idx
for i,s in enumerate(samples): res[-len(s[0]):,i] = LongTensor(s[0])
return res, LongTensor([s[1] for s in samples]).squeeze()
#export
def classifier_data(datasets:Collection[TextDataset], path:PathOrStr, **kwargs) -> DataBunch:
"Function that transform the `datasets` in a `DataBunch` for classification"
bs = kwargs.pop('bs') if 'bs' in kwargs else 64
pad_idx = kwargs.pop('pad_idx') if 'pad_idx' in kwargs else 1
train_sampler = SortishSampler(datasets[0].ids, key=lambda x: len(datasets[0].ids[x]), bs=bs//2)
train_dl = DeviceDataLoader.create(datasets[0], bs//2, sampler=train_sampler, collate_fn=pad_collate)
dataloaders = [train_dl]
for ds in datasets[1:]:
sampler = SortSampler(ds.ids, key=lambda x: len(ds.ids[x]))
dataloaders.append(DeviceDataLoader.create(ds, bs, sampler=sampler, collate_fn=pad_collate))
return DataBunch(*dataloaders, path=path)
We need to use the same vocab as for the LM.
vocab = Vocab(LM_PATH/'tmp')
data = data_from_textcsv(CLAS_PATH, Tokenizer(), vocab=vocab, data_func=classifier_data, bs=50)
data.train_ds.vocab.itos[40:60]
vocab.itos[40:60]
x,y = next(iter(data.train_dl))
vocab.textify(x[:,15]), y[2]
#export
class MultiBatchRNNCore(RNNCore):
"Creates a RNNCore module that can process a full sentence."
def __init__(self, bptt:int, max_seq:int, *args, **kwargs):
self.max_seq,self.bptt = max_seq,bptt
super().__init__(*args, **kwargs)
def concat(self, arrs:Collection[Tensor]) -> Tensor:
"Concatenates the arrays along the batch dimension."
return [torch.cat([l[si] for l in arrs]) for si in range(len(arrs[0]))]
def forward(self, input:LongTensor) -> Tuple[Tensor,Tensor]:
sl,bs = input.size()
self.reset()
raw_outputs, outputs = [],[]
for i in range(0, sl, self.bptt):
r, o = super().forward(input[i: min(i+self.bptt, sl)])
if i>(sl-self.max_seq):
raw_outputs.append(r)
outputs.append(o)
return self.concat(raw_outputs), self.concat(outputs)
#export
class PoolingLinearClassifier(nn.Module):
"Creates a linear classifier with pooling."
def __init__(self, layers:Collection[int], drops:Collection[float]):
super().__init__()
mod_layers = []
activs = [nn.ReLU(inplace=True)] * (len(layers) - 2) + [None]
for n_in,n_out,p,actn in zip(layers[:-1],layers[1:], drops, activs):
mod_layers += bn_drop_lin(n_in, n_out, p=p, actn=actn)
self.layers = nn.Sequential(*mod_layers)
def pool(self, x:Tensor, bs:int, is_max:bool):
"Pools the tensor along the seq_len dimension."
f = F.adaptive_max_pool1d if is_max else F.adaptive_avg_pool1d
return f(x.permute(1,2,0), (1,)).view(bs,-1)
def forward(self, input:Tuple[Tensor,Tensor]) -> Tuple[Tensor,Tensor,Tensor]:
raw_outputs, outputs = input
output = outputs[-1]
sl,bs,_ = output.size()
avgpool = self.pool(output, bs, False)
mxpool = self.pool(output, bs, True)
x = torch.cat([output[-1], mxpool, avgpool], 1)
x = self.layers(x)
return x, raw_outputs, outputs
#export
def rnn_classifier_split(model:Model) -> List[Model]:
"Splits a RNN model in groups."
groups = [nn.Sequential(model[0].encoder, model[0].encoder_dp)]
groups += [nn.Sequential(rnn, dp) for rnn, dp in zip(model[0].rnns, model[0].hidden_dps)]
groups.append(model[1])
return groups
#export
def get_rnn_classifier(bptt:int, max_seq:int, n_class:int, vocab_sz:int, emb_sz:int, n_hid:int, n_layers:int,
pad_token:int, layers:Collection[int], drops:Collection[float], bidir:bool=False, qrnn:bool=False,
hidden_p:float=0.2, input_p:float=0.6, embed_p:float=0.1, weight_p:float=0.5) -> Model:
"Creates a RNN classifier model"
rnn_enc = MultiBatchRNNCore(bptt, max_seq, vocab_sz, emb_sz, n_hid, n_layers, pad_token=pad_token, bidir=bidir,
qrnn=qrnn, hidden_p=hidden_p, input_p=input_p, embed_p=embed_p, weight_p=weight_p)
return SequentialRNN(rnn_enc, PoolingLinearClassifier(layers, drops))
#export
SplitFunc = Callable[[Model], List[Model]]
OptSplitFunc = Optional[SplitFunc]
OptStrTuple = Optional[Tuple[str,str]]
class RNNLearner(Learner):
"Basic class for a Learner in RNN"
def __init__(self, data:DataBunch, model:Model, bptt:int=70, split_func:OptSplitFunc=None, clip:float=None,
adjust:bool=False, alpha:float=2., beta:float=1., **kwargs):
super().__init__(data, model)
self.callbacks.append(RNNTrainer(self, bptt, alpha=alpha, beta=beta, adjust=adjust))
if clip: self.callback_fns.append(partial(GradientClipping, clip=clip))
if split_func: self.split(split_func)
self.metrics = [accuracy]
def save_encoder(self, name:str):
"Saves the encoder to the model directory"
torch.save(self.model[0].state_dict(), self.path/self.model_dir/f'{name}.pth')
def load_encoder(self, name:str):
"Loads the encoder from the model directory"
self.model[0].load_state_dict(torch.load(self.path/self.model_dir/f'{name}.pth'))
def load_pretrained(self, wgts_fname:str, itos_fname:str):
"Loads a pretrained model and adapts it to the data vocabulary."
old_itos = pickle.load(open(self.path/self.model_dir/f'{itos_fname}.pkl', 'rb'))
old_stoi = {v:k for k,v in enumerate(old_itos)}
wgts = torch.load(self.path/self.model_dir/f'{wgts_fname}.pth', map_location=lambda storage, loc: storage)
wgts = convert_weights(wgts, old_stoi, self.data.train_ds.vocab.itos)
self.model.load_state_dict(wgts)
@classmethod
def language_model(cls, data:DataBunch, bptt:int=70, emb_sz:int=400, nh:int=1150, nl:int=3, pad_token:int=1,
drop_mult:float=1., tie_weights:bool=True, bias:bool=True, qrnn:bool=False,
pretrained_fnames:OptStrTuple=None, **kwargs) -> 'RNNLearner':
"Creates a `Learner` with a language model."
dps = np.array([0.25, 0.1, 0.2, 0.02, 0.15]) * drop_mult
vocab_size = len(data.train_ds.vocab.itos)
model = get_language_model(vocab_size, emb_sz, nh, nl, pad_token, input_p=dps[0], output_p=dps[1],
weight_p=dps[2], embed_p=dps[3], hidden_p=dps[4], tie_weights=tie_weights, bias=bias, qrnn=qrnn)
learn = cls(data, model, bptt, split_func=lm_split, **kwargs)
if pretrained_fnames is not None: learn.load_pretrained(*pretrained_fnames)
return learn
@classmethod
def classifier(cls, data:DataBunch, bptt:int=70, max_len:int=70*20, emb_sz:int=400, nh:int=1150, nl:int=3,
layers:Collection[int]=None, drops:Collection[float]=None, pad_token:int=1,
drop_mult:float=1., qrnn:bool=False, **kwargs) -> 'RNNLearner':
"Creates a RNN classifier."
dps = np.array([0.4,0.5,0.05,0.3,0.4]) * drop_mult
if layers is None: layers = [50]
if drops is None: drops = [0.1]
vocab_size = len(data.train_ds.vocab.itos)
n_class = len(data.train_ds.classes)
layers = [emb_sz*3] + layers + [n_class]
drops = [dps[4]] + drops
model = get_rnn_classifier(bptt, max_len, n_class, vocab_size, emb_sz, nh, nl, pad_token,
layers, drops, input_p=dps[0], weight_p=dps[1], embed_p=dps[2], hidden_p=dps[3], qrnn=qrnn)
learn = cls(data, model, bptt, split_func=rnn_classifier_split, **kwargs)
return learn
data = data_from_textcsv(CLAS_PATH, Tokenizer(), vocab=Vocab(LM_PATH/'tmp'), data_func=classifier_data, bs=50)
learn = RNNLearner.classifier(data, drop_mult=0.5)
learn.load_encoder('fine_tuned_enc60ka')
learn.freeze()
learn.lr_find()
learn.recorder.plot()
learn.fit_one_cycle(1, 2e-2, moms=(0.8,0.7))
learn.save('first')
learn.load('first')
learn.freeze_to(-2)
learn.fit_one_cycle(1, slice(1e-2/2.6,1e-2), moms=(0.8,0.7), pct_start=0.1)
learn.save('second')
learn.load('second')
learn.freeze_to(-3)
learn.fit_one_cycle(1, slice(5e-3/(2.6**2),5e-3), moms=(0.8,0.7), pct_start=0.1)
learn.save('third')
learn.load('third')
learn.unfreeze()
learn.fit_one_cycle(2, slice(1e-3/(2.6**4),1e-3), moms=(0.8,0.7), pct_start=0.1)