Chapter 16 – Natural Language Processing with RNNs and Attention
This notebook contains all the sample code in chapter 16.
First, let's import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures. We also check that Python 3.5 or later is installed (although Python 2.x may work, it is deprecated so we strongly recommend you use Python 3 instead), as well as Scikit-Learn ≥0.20 and TensorFlow ≥2.0.
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)
# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"
try:
# %tensorflow_version only exists in Colab.
%tensorflow_version 2.x
!pip install -q -U tensorflow-addons
!pip install -q -U transformers
IS_COLAB = True
except Exception:
IS_COLAB = False
# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"
if not tf.config.list_physical_devices('GPU'):
print("No GPU was detected. LSTMs and CNNs can be very slow without a GPU.")
if IS_COLAB:
print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")
# Common imports
import numpy as np
import os
# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)
# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "nlp"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)
def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
print("Saving figure", fig_id)
if tight_layout:
plt.tight_layout()
plt.savefig(path, format=fig_extension, dpi=resolution)
For example, let's split the sequence 0 to 14 into windows of length 5, each shifted by 2 (e.g.,[0, 1, 2, 3, 4]
, [2, 3, 4, 5, 6]
, etc.), then shuffle them, and split them into inputs (the first 4 steps) and targets (the last 4 steps) (e.g., [2, 3, 4, 5, 6]
would be split into [[2, 3, 4, 5], [3, 4, 5, 6]]
), then create batches of 3 such input/target pairs:
np.random.seed(42)
tf.random.set_seed(42)
n_steps = 5
dataset = tf.data.Dataset.from_tensor_slices(tf.range(15))
dataset = dataset.window(n_steps, shift=2, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(n_steps))
dataset = dataset.shuffle(10).map(lambda window: (window[:-1], window[1:]))
dataset = dataset.batch(3).prefetch(1)
for index, (X_batch, Y_batch) in enumerate(dataset):
print("_" * 20, "Batch", index, "\nX_batch")
print(X_batch.numpy())
print("=" * 5, "\nY_batch")
print(Y_batch.numpy())
shakespeare_url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
filepath = keras.utils.get_file("shakespeare.txt", shakespeare_url)
with open(filepath) as f:
shakespeare_text = f.read()
print(shakespeare_text[:148])
"".join(sorted(set(shakespeare_text.lower())))
tokenizer = keras.preprocessing.text.Tokenizer(char_level=True)
tokenizer.fit_on_texts(shakespeare_text)
tokenizer.texts_to_sequences(["First"])
tokenizer.sequences_to_texts([[20, 6, 9, 8, 3]])
max_id = len(tokenizer.word_index) # number of distinct characters
dataset_size = tokenizer.document_count # total number of characters
[encoded] = np.array(tokenizer.texts_to_sequences([shakespeare_text])) - 1
train_size = dataset_size * 90 // 100
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])
n_steps = 100
window_length = n_steps + 1 # target = input shifted 1 character ahead
dataset = dataset.repeat().window(window_length, shift=1, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_length))
np.random.seed(42)
tf.random.set_seed(42)
batch_size = 32
dataset = dataset.shuffle(10000).batch(batch_size)
dataset = dataset.map(lambda windows: (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(
lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
dataset = dataset.prefetch(1)
for X_batch, Y_batch in dataset.take(1):
print(X_batch.shape, Y_batch.shape)
Warning: the following code may take up to 24 hours to run, depending on your hardware. If you use a GPU, it may take just 1 or 2 hours, or less.
Note: the GRU
class will only use the GPU (if you have one) when using the default values for the following arguments: activation
, recurrent_activation
, recurrent_dropout
, unroll
, use_bias
and reset_after
. This is why I commented out recurrent_dropout=0.2
(compared to the book).
model = keras.models.Sequential([
keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id],
#dropout=0.2, recurrent_dropout=0.2),
dropout=0.2),
keras.layers.GRU(128, return_sequences=True,
#dropout=0.2, recurrent_dropout=0.2),
dropout=0.2),
keras.layers.TimeDistributed(keras.layers.Dense(max_id,
activation="softmax"))
])
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam")
history = model.fit(dataset, steps_per_epoch=train_size // batch_size,
epochs=10)
def preprocess(texts):
X = np.array(tokenizer.texts_to_sequences(texts)) - 1
return tf.one_hot(X, max_id)
Warning: the predict_classes()
method is deprecated. Instead, we must use np.argmax(model.predict(X_new), axis=-1)
.
X_new = preprocess(["How are yo"])
#Y_pred = model.predict_classes(X_new)
Y_pred = np.argmax(model.predict(X_new), axis=-1)
tokenizer.sequences_to_texts(Y_pred + 1)[0][-1] # 1st sentence, last char
tf.random.set_seed(42)
tf.random.categorical([[np.log(0.5), np.log(0.4), np.log(0.1)]], num_samples=40).numpy()
def next_char(text, temperature=1):
X_new = preprocess([text])
y_proba = model.predict(X_new)[0, -1:, :]
rescaled_logits = tf.math.log(y_proba) / temperature
char_id = tf.random.categorical(rescaled_logits, num_samples=1) + 1
return tokenizer.sequences_to_texts(char_id.numpy())[0]
tf.random.set_seed(42)
next_char("How are yo", temperature=1)
def complete_text(text, n_chars=50, temperature=1):
for _ in range(n_chars):
text += next_char(text, temperature)
return text
tf.random.set_seed(42)
print(complete_text("t", temperature=0.2))
print(complete_text("t", temperature=1))
print(complete_text("t", temperature=2))
tf.random.set_seed(42)
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])
dataset = dataset.window(window_length, shift=n_steps, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_length))
dataset = dataset.repeat().batch(1)
dataset = dataset.map(lambda windows: (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(
lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
dataset = dataset.prefetch(1)
batch_size = 32
encoded_parts = np.array_split(encoded[:train_size], batch_size)
datasets = []
for encoded_part in encoded_parts:
dataset = tf.data.Dataset.from_tensor_slices(encoded_part)
dataset = dataset.window(window_length, shift=n_steps, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_length))
datasets.append(dataset)
dataset = tf.data.Dataset.zip(tuple(datasets)).map(lambda *windows: tf.stack(windows))
dataset = dataset.repeat().map(lambda windows: (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(
lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
dataset = dataset.prefetch(1)
model = keras.models.Sequential([
keras.layers.GRU(128, return_sequences=True, stateful=True,
dropout=0.2, recurrent_dropout=0.2,
batch_input_shape=[batch_size, None, max_id]),
keras.layers.GRU(128, return_sequences=True, stateful=True,
dropout=0.2, recurrent_dropout=0.2),
keras.layers.TimeDistributed(keras.layers.Dense(max_id,
activation="softmax"))
])
class ResetStatesCallback(keras.callbacks.Callback):
def on_epoch_begin(self, epoch, logs):
self.model.reset_states()
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam")
steps_per_epoch = train_size // batch_size // n_steps
history = model.fit(dataset, steps_per_epoch=steps_per_epoch, epochs=50,
callbacks=[ResetStatesCallback()])
To use the model with different batch sizes, we need to create a stateless copy. We can get rid of dropout since it is only used during training:
stateless_model = keras.models.Sequential([
keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id]),
keras.layers.GRU(128, return_sequences=True),
keras.layers.TimeDistributed(keras.layers.Dense(max_id,
activation="softmax"))
])
To set the weights, we first need to build the model (so the weights get created):
stateless_model.build(tf.TensorShape([None, None, max_id]))
stateless_model.set_weights(model.get_weights())
model = stateless_model
tf.random.set_seed(42)
print(complete_text("t"))
tf.random.set_seed(42)
You can load the IMDB dataset easily:
(X_train, y_train), (X_test, y_test) = keras.datasets.imdb.load_data()
X_train[0][:10]
word_index = keras.datasets.imdb.get_word_index()
id_to_word = {id_ + 3: word for word, id_ in word_index.items()}
for id_, token in enumerate(("<pad>", "<sos>", "<unk>")):
id_to_word[id_] = token
" ".join([id_to_word[id_] for id_ in X_train[0][:10]])
import tensorflow_datasets as tfds
datasets, info = tfds.load("imdb_reviews", as_supervised=True, with_info=True)
datasets.keys()
train_size = info.splits["train"].num_examples
test_size = info.splits["test"].num_examples
train_size, test_size
for X_batch, y_batch in datasets["train"].batch(2).take(1):
for review, label in zip(X_batch.numpy(), y_batch.numpy()):
print("Review:", review.decode("utf-8")[:200], "...")
print("Label:", label, "= Positive" if label else "= Negative")
print()
def preprocess(X_batch, y_batch):
X_batch = tf.strings.substr(X_batch, 0, 300)
X_batch = tf.strings.regex_replace(X_batch, rb"<br\s*/?>", b" ")
X_batch = tf.strings.regex_replace(X_batch, b"[^a-zA-Z']", b" ")
X_batch = tf.strings.split(X_batch)
return X_batch.to_tensor(default_value=b"<pad>"), y_batch
preprocess(X_batch, y_batch)
from collections import Counter
vocabulary = Counter()
for X_batch, y_batch in datasets["train"].batch(32).map(preprocess):
for review in X_batch:
vocabulary.update(list(review.numpy()))
vocabulary.most_common()[:3]
len(vocabulary)
vocab_size = 10000
truncated_vocabulary = [
word for word, count in vocabulary.most_common()[:vocab_size]]
word_to_id = {word: index for index, word in enumerate(truncated_vocabulary)}
for word in b"This movie was faaaaaantastic".split():
print(word_to_id.get(word) or vocab_size)
words = tf.constant(truncated_vocabulary)
word_ids = tf.range(len(truncated_vocabulary), dtype=tf.int64)
vocab_init = tf.lookup.KeyValueTensorInitializer(words, word_ids)
num_oov_buckets = 1000
table = tf.lookup.StaticVocabularyTable(vocab_init, num_oov_buckets)
table.lookup(tf.constant([b"This movie was faaaaaantastic".split()]))
def encode_words(X_batch, y_batch):
return table.lookup(X_batch), y_batch
train_set = datasets["train"].repeat().batch(32).map(preprocess)
train_set = train_set.map(encode_words).prefetch(1)
for X_batch, y_batch in train_set.take(1):
print(X_batch)
print(y_batch)
embed_size = 128
model = keras.models.Sequential([
keras.layers.Embedding(vocab_size + num_oov_buckets, embed_size,
mask_zero=True, # not shown in the book
input_shape=[None]),
keras.layers.GRU(128, return_sequences=True),
keras.layers.GRU(128),
keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
history = model.fit(train_set, steps_per_epoch=train_size // 32, epochs=5)
Or using manual masking:
K = keras.backend
embed_size = 128
inputs = keras.layers.Input(shape=[None])
mask = keras.layers.Lambda(lambda inputs: K.not_equal(inputs, 0))(inputs)
z = keras.layers.Embedding(vocab_size + num_oov_buckets, embed_size)(inputs)
z = keras.layers.GRU(128, return_sequences=True)(z, mask=mask)
z = keras.layers.GRU(128)(z, mask=mask)
outputs = keras.layers.Dense(1, activation="sigmoid")(z)
model = keras.models.Model(inputs=[inputs], outputs=[outputs])
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
history = model.fit(train_set, steps_per_epoch=train_size // 32, epochs=5)
tf.random.set_seed(42)
TFHUB_CACHE_DIR = os.path.join(os.curdir, "my_tfhub_cache")
os.environ["TFHUB_CACHE_DIR"] = TFHUB_CACHE_DIR
import tensorflow_hub as hub
model = keras.Sequential([
hub.KerasLayer("https://tfhub.dev/google/tf2-preview/nnlm-en-dim50/1",
dtype=tf.string, input_shape=[], output_shape=[50]),
keras.layers.Dense(128, activation="relu"),
keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="adam",
metrics=["accuracy"])
for dirpath, dirnames, filenames in os.walk(TFHUB_CACHE_DIR):
for filename in filenames:
print(os.path.join(dirpath, filename))
import tensorflow_datasets as tfds
datasets, info = tfds.load("imdb_reviews", as_supervised=True, with_info=True)
train_size = info.splits["train"].num_examples
batch_size = 32
train_set = datasets["train"].repeat().batch(batch_size).prefetch(1)
history = model.fit(train_set, steps_per_epoch=train_size // batch_size, epochs=5)
tf.random.set_seed(42)
vocab_size = 100
embed_size = 10
import tensorflow_addons as tfa
encoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
decoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
sequence_lengths = keras.layers.Input(shape=[], dtype=np.int32)
embeddings = keras.layers.Embedding(vocab_size, embed_size)
encoder_embeddings = embeddings(encoder_inputs)
decoder_embeddings = embeddings(decoder_inputs)
encoder = keras.layers.LSTM(512, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_embeddings)
encoder_state = [state_h, state_c]
sampler = tfa.seq2seq.sampler.TrainingSampler()
decoder_cell = keras.layers.LSTMCell(512)
output_layer = keras.layers.Dense(vocab_size)
decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell, sampler,
output_layer=output_layer)
final_outputs, final_state, final_sequence_lengths = decoder(
decoder_embeddings, initial_state=encoder_state,
sequence_length=sequence_lengths)
Y_proba = tf.nn.softmax(final_outputs.rnn_output)
model = keras.models.Model(
inputs=[encoder_inputs, decoder_inputs, sequence_lengths],
outputs=[Y_proba])
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam")
X = np.random.randint(100, size=10*1000).reshape(1000, 10)
Y = np.random.randint(100, size=15*1000).reshape(1000, 15)
X_decoder = np.c_[np.zeros((1000, 1)), Y[:, :-1]]
seq_lengths = np.full([1000], 15)
history = model.fit([X, X_decoder, seq_lengths], Y, epochs=2)
model = keras.models.Sequential([
keras.layers.GRU(10, return_sequences=True, input_shape=[None, 10]),
keras.layers.Bidirectional(keras.layers.GRU(10, return_sequences=True))
])
model.summary()
class PositionalEncoding(keras.layers.Layer):
def __init__(self, max_steps, max_dims, dtype=tf.float32, **kwargs):
super().__init__(dtype=dtype, **kwargs)
if max_dims % 2 == 1: max_dims += 1 # max_dims must be even
p, i = np.meshgrid(np.arange(max_steps), np.arange(max_dims // 2))
pos_emb = np.empty((1, max_steps, max_dims))
pos_emb[0, :, ::2] = np.sin(p / 10000**(2 * i / max_dims)).T
pos_emb[0, :, 1::2] = np.cos(p / 10000**(2 * i / max_dims)).T
self.positional_embedding = tf.constant(pos_emb.astype(self.dtype))
def call(self, inputs):
shape = tf.shape(inputs)
return inputs + self.positional_embedding[:, :shape[-2], :shape[-1]]
max_steps = 201
max_dims = 512
pos_emb = PositionalEncoding(max_steps, max_dims)
PE = pos_emb(np.zeros((1, max_steps, max_dims), np.float32))[0].numpy()
i1, i2, crop_i = 100, 101, 150
p1, p2, p3 = 22, 60, 35
fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(9, 5))
ax1.plot([p1, p1], [-1, 1], "k--", label="$p = {}$".format(p1))
ax1.plot([p2, p2], [-1, 1], "k--", label="$p = {}$".format(p2), alpha=0.5)
ax1.plot(p3, PE[p3, i1], "bx", label="$p = {}$".format(p3))
ax1.plot(PE[:,i1], "b-", label="$i = {}$".format(i1))
ax1.plot(PE[:,i2], "r-", label="$i = {}$".format(i2))
ax1.plot([p1, p2], [PE[p1, i1], PE[p2, i1]], "bo")
ax1.plot([p1, p2], [PE[p1, i2], PE[p2, i2]], "ro")
ax1.legend(loc="center right", fontsize=14, framealpha=0.95)
ax1.set_ylabel("$P_{(p,i)}$", rotation=0, fontsize=16)
ax1.grid(True, alpha=0.3)
ax1.hlines(0, 0, max_steps - 1, color="k", linewidth=1, alpha=0.3)
ax1.axis([0, max_steps - 1, -1, 1])
ax2.imshow(PE.T[:crop_i], cmap="gray", interpolation="bilinear", aspect="auto")
ax2.hlines(i1, 0, max_steps - 1, color="b")
cheat = 2 # need to raise the red line a bit, or else it hides the blue one
ax2.hlines(i2+cheat, 0, max_steps - 1, color="r")
ax2.plot([p1, p1], [0, crop_i], "k--")
ax2.plot([p2, p2], [0, crop_i], "k--", alpha=0.5)
ax2.plot([p1, p2], [i2+cheat, i2+cheat], "ro")
ax2.plot([p1, p2], [i1, i1], "bo")
ax2.axis([0, max_steps - 1, 0, crop_i])
ax2.set_xlabel("$p$", fontsize=16)
ax2.set_ylabel("$i$", rotation=0, fontsize=16)
plt.savefig("positional_embedding_plot")
plt.show()
embed_size = 512; max_steps = 500; vocab_size = 10000
encoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
decoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
embeddings = keras.layers.Embedding(vocab_size, embed_size)
encoder_embeddings = embeddings(encoder_inputs)
decoder_embeddings = embeddings(decoder_inputs)
positional_encoding = PositionalEncoding(max_steps, max_dims=embed_size)
encoder_in = positional_encoding(encoder_embeddings)
decoder_in = positional_encoding(decoder_embeddings)
Here is a (very) simplified Transformer (the actual architecture has skip connections, layer norm, dense nets, and most importantly it uses Multi-Head Attention instead of regular Attention):
Z = encoder_in
for N in range(6):
Z = keras.layers.Attention(use_scale=True)([Z, Z])
encoder_outputs = Z
Z = decoder_in
for N in range(6):
Z = keras.layers.Attention(use_scale=True, causal=True)([Z, Z])
Z = keras.layers.Attention(use_scale=True)([Z, encoder_outputs])
outputs = keras.layers.TimeDistributed(
keras.layers.Dense(vocab_size, activation="softmax"))(Z)
Here's a basic implementation of the MultiHeadAttention
layer. One will likely be added to keras.layers
in the near future. Note that Conv1D
layers with kernel_size=1
(and the default padding="valid"
and strides=1
) is equivalent to a TimeDistributed(Dense(...))
layer.
K = keras.backend
class MultiHeadAttention(keras.layers.Layer):
def __init__(self, n_heads, causal=False, use_scale=False, **kwargs):
self.n_heads = n_heads
self.causal = causal
self.use_scale = use_scale
super().__init__(**kwargs)
def build(self, batch_input_shape):
self.dims = batch_input_shape[0][-1]
self.q_dims, self.v_dims, self.k_dims = [self.dims // self.n_heads] * 3 # could be hyperparameters instead
self.q_linear = keras.layers.Conv1D(self.n_heads * self.q_dims, kernel_size=1, use_bias=False)
self.v_linear = keras.layers.Conv1D(self.n_heads * self.v_dims, kernel_size=1, use_bias=False)
self.k_linear = keras.layers.Conv1D(self.n_heads * self.k_dims, kernel_size=1, use_bias=False)
self.attention = keras.layers.Attention(causal=self.causal, use_scale=self.use_scale)
self.out_linear = keras.layers.Conv1D(self.dims, kernel_size=1, use_bias=False)
super().build(batch_input_shape)
def _multi_head_linear(self, inputs, linear):
shape = K.concatenate([K.shape(inputs)[:-1], [self.n_heads, -1]])
projected = K.reshape(linear(inputs), shape)
perm = K.permute_dimensions(projected, [0, 2, 1, 3])
return K.reshape(perm, [shape[0] * self.n_heads, shape[1], -1])
def call(self, inputs):
q = inputs[0]
v = inputs[1]
k = inputs[2] if len(inputs) > 2 else v
shape = K.shape(q)
q_proj = self._multi_head_linear(q, self.q_linear)
v_proj = self._multi_head_linear(v, self.v_linear)
k_proj = self._multi_head_linear(k, self.k_linear)
multi_attended = self.attention([q_proj, v_proj, k_proj])
shape_attended = K.shape(multi_attended)
reshaped_attended = K.reshape(multi_attended, [shape[0], self.n_heads, shape_attended[1], shape_attended[2]])
perm = K.permute_dimensions(reshaped_attended, [0, 2, 1, 3])
concat = K.reshape(perm, [shape[0], shape_attended[1], -1])
return self.out_linear(concat)
Q = np.random.rand(2, 50, 512)
V = np.random.rand(2, 80, 512)
multi_attn = MultiHeadAttention(8)
multi_attn([Q, V]).shape
See Appendix A.
Exercise: Embedded Reber grammars were used by Hochreiter and Schmidhuber in their paper about LSTMs. They are artificial grammars that produce strings such as "BPBTSXXVPSEPE." Check out Jenny Orr's nice introduction to this topic. Choose a particular embedded Reber grammar (such as the one represented on Jenny Orr's page), then train an RNN to identify whether a string respects that grammar or not. You will first need to write a function capable of generating a training batch containing about 50% strings that respect the grammar, and 50% that don't.
First we need to build a function that generates strings based on a grammar. The grammar will be represented as a list of possible transitions for each state. A transition specifies the string to output (or a grammar to generate it) and the next state.
default_reber_grammar = [
[("B", 1)], # (state 0) =B=>(state 1)
[("T", 2), ("P", 3)], # (state 1) =T=>(state 2) or =P=>(state 3)
[("S", 2), ("X", 4)], # (state 2) =S=>(state 2) or =X=>(state 4)
[("T", 3), ("V", 5)], # and so on...
[("X", 3), ("S", 6)],
[("P", 4), ("V", 6)],
[("E", None)]] # (state 6) =E=>(terminal state)
embedded_reber_grammar = [
[("B", 1)],
[("T", 2), ("P", 3)],
[(default_reber_grammar, 4)],
[(default_reber_grammar, 5)],
[("T", 6)],
[("P", 6)],
[("E", None)]]
def generate_string(grammar):
state = 0
output = []
while state is not None:
index = np.random.randint(len(grammar[state]))
production, state = grammar[state][index]
if isinstance(production, list):
production = generate_string(grammar=production)
output.append(production)
return "".join(output)
Let's generate a few strings based on the default Reber grammar:
np.random.seed(42)
for _ in range(25):
print(generate_string(default_reber_grammar), end=" ")
Looks good. Now let's generate a few strings based on the embedded Reber grammar:
np.random.seed(42)
for _ in range(25):
print(generate_string(embedded_reber_grammar), end=" ")
Okay, now we need a function to generate strings that do not respect the grammar. We could generate a random string, but the task would be a bit too easy, so instead we will generate a string that respects the grammar, and we will corrupt it by changing just one character:
POSSIBLE_CHARS = "BEPSTVX"
def generate_corrupted_string(grammar, chars=POSSIBLE_CHARS):
good_string = generate_string(grammar)
index = np.random.randint(len(good_string))
good_char = good_string[index]
bad_char = np.random.choice(sorted(set(chars) - set(good_char)))
return good_string[:index] + bad_char + good_string[index + 1:]
Let's look at a few corrupted strings:
np.random.seed(42)
for _ in range(25):
print(generate_corrupted_string(embedded_reber_grammar), end=" ")
We cannot feed strings directly to an RNN, so we need to encode them somehow. One option would be to one-hot encode each character. Another option is to use embeddings. Let's go for the second option (but since there are just a handful of characters, one-hot encoding would probably be a good option as well). For embeddings to work, we need to convert each string into a sequence of character IDs. Let's write a function for that, using each character's index in the string of possible characters "BEPSTVX":
def string_to_ids(s, chars=POSSIBLE_CHARS):
return [chars.index(c) for c in s]
string_to_ids("BTTTXXVVETE")
We can now generate the dataset, with 50% good strings, and 50% bad strings:
def generate_dataset(size):
good_strings = [string_to_ids(generate_string(embedded_reber_grammar))
for _ in range(size // 2)]
bad_strings = [string_to_ids(generate_corrupted_string(embedded_reber_grammar))
for _ in range(size - size // 2)]
all_strings = good_strings + bad_strings
X = tf.ragged.constant(all_strings, ragged_rank=1)
y = np.array([[1.] for _ in range(len(good_strings))] +
[[0.] for _ in range(len(bad_strings))])
return X, y
np.random.seed(42)
X_train, y_train = generate_dataset(10000)
X_valid, y_valid = generate_dataset(2000)
Let's take a look at the first training sequence:
X_train[0]
What class does it belong to?
y_train[0]
Perfect! We are ready to create the RNN to identify good strings. We build a simple sequence binary classifier:
np.random.seed(42)
tf.random.set_seed(42)
embedding_size = 5
model = keras.models.Sequential([
keras.layers.InputLayer(input_shape=[None], dtype=tf.int32, ragged=True),
keras.layers.Embedding(input_dim=len(POSSIBLE_CHARS), output_dim=embedding_size),
keras.layers.GRU(30),
keras.layers.Dense(1, activation="sigmoid")
])
optimizer = keras.optimizers.SGD(lr=0.02, momentum = 0.95, nesterov=True)
model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid))
Now let's test our RNN on two tricky strings: the first one is bad while the second one is good. They only differ by the second to last character. If the RNN gets this right, it shows that it managed to notice the pattern that the second letter should always be equal to the second to last letter. That requires a fairly long short-term memory (which is the reason why we used a GRU cell).
test_strings = ["BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE",
"BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE"]
X_test = tf.ragged.constant([string_to_ids(s) for s in test_strings], ragged_rank=1)
y_proba = model.predict(X_test)
print()
print("Estimated probability that these are Reber strings:")
for index, string in enumerate(test_strings):
print("{}: {:.2f}%".format(string, 100 * y_proba[index][0]))
Ta-da! It worked fine. The RNN found the correct answers with very high confidence. :)
Exercise: Train an Encoder–Decoder model that can convert a date string from one format to another (e.g., from "April 22, 2019" to "2019-04-22").
Let's start by creating the dataset. We will use random days between 1000-01-01 and 9999-12-31:
from datetime import date
# cannot use strftime()'s %B format since it depends on the locale
MONTHS = ["January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"]
def random_dates(n_dates):
min_date = date(1000, 1, 1).toordinal()
max_date = date(9999, 12, 31).toordinal()
ordinals = np.random.randint(max_date - min_date, size=n_dates) + min_date
dates = [date.fromordinal(ordinal) for ordinal in ordinals]
x = [MONTHS[dt.month - 1] + " " + dt.strftime("%d, %Y") for dt in dates]
y = [dt.isoformat() for dt in dates]
return x, y
Here are a few random dates, displayed in both the input format and the target format:
np.random.seed(42)
n_dates = 3
x_example, y_example = random_dates(n_dates)
print("{:25s}{:25s}".format("Input", "Target"))
print("-" * 50)
for idx in range(n_dates):
print("{:25s}{:25s}".format(x_example[idx], y_example[idx]))
Let's get the list of all possible characters in the inputs:
INPUT_CHARS = "".join(sorted(set("".join(MONTHS) + "0123456789, ")))
INPUT_CHARS
And here's the list of possible characters in the outputs:
OUTPUT_CHARS = "0123456789-"
Let's write a function to convert a string to a list of character IDs, as we did in the previous exercise:
def date_str_to_ids(date_str, chars=INPUT_CHARS):
return [chars.index(c) for c in date_str]
date_str_to_ids(x_example[0], INPUT_CHARS)
date_str_to_ids(y_example[0], OUTPUT_CHARS)
def prepare_date_strs(date_strs, chars=INPUT_CHARS):
X_ids = [date_str_to_ids(dt, chars) for dt in date_strs]
X = tf.ragged.constant(X_ids, ragged_rank=1)
return (X + 1).to_tensor() # using 0 as the padding token ID
def create_dataset(n_dates):
x, y = random_dates(n_dates)
return prepare_date_strs(x, INPUT_CHARS), prepare_date_strs(y, OUTPUT_CHARS)
np.random.seed(42)
X_train, Y_train = create_dataset(10000)
X_valid, Y_valid = create_dataset(2000)
X_test, Y_test = create_dataset(2000)
Y_train[0]
Let's first try the simplest possible model: we feed in the input sequence, which first goes through the encoder (an embedding layer followed by a single LSTM layer), which outputs a vector, then it goes through a decoder (a single LSTM layer, followed by a dense output layer), which outputs a sequence of vectors, each representing the estimated probabilities for all possible output character.
Since the decoder expects a sequence as input, we repeat the vector (which is output by the decoder) as many times as the longest possible output sequence.
embedding_size = 32
max_output_length = Y_train.shape[1]
np.random.seed(42)
tf.random.set_seed(42)
encoder = keras.models.Sequential([
keras.layers.Embedding(input_dim=len(INPUT_CHARS) + 1,
output_dim=embedding_size,
input_shape=[None]),
keras.layers.LSTM(128)
])
decoder = keras.models.Sequential([
keras.layers.LSTM(128, return_sequences=True),
keras.layers.Dense(len(OUTPUT_CHARS) + 1, activation="softmax")
])
model = keras.models.Sequential([
encoder,
keras.layers.RepeatVector(max_output_length),
decoder
])
optimizer = keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
metrics=["accuracy"])
history = model.fit(X_train, Y_train, epochs=20,
validation_data=(X_valid, Y_valid))
Looks great, we reach 100% validation accuracy! Let's use the model to make some predictions. We will need to be able to convert a sequence of character IDs to a readable string:
def ids_to_date_strs(ids, chars=OUTPUT_CHARS):
return ["".join([("?" + chars)[index] for index in sequence])
for sequence in ids]
Now we can use the model to convert some dates
X_new = prepare_date_strs(["September 17, 2009", "July 14, 1789"])
#ids = model.predict_classes(X_new)
ids = np.argmax(model.predict(X_new), axis=-1)
for date_str in ids_to_date_strs(ids):
print(date_str)
Perfect! :)
However, since the model was only trained on input strings of length 18 (which is the length of the longest date), it does not perform well if we try to use it to make predictions on shorter sequences:
X_new = prepare_date_strs(["May 02, 2020", "July 14, 1789"])
#ids = model.predict_classes(X_new)
ids = np.argmax(model.predict(X_new), axis=-1)
for date_str in ids_to_date_strs(ids):
print(date_str)
Oops! We need to ensure that we always pass sequences of the same length as during training, using padding if necessary. Let's write a little helper function for that:
max_input_length = X_train.shape[1]
def prepare_date_strs_padded(date_strs):
X = prepare_date_strs(date_strs)
if X.shape[1] < max_input_length:
X = tf.pad(X, [[0, 0], [0, max_input_length - X.shape[1]]])
return X
def convert_date_strs(date_strs):
X = prepare_date_strs_padded(date_strs)
#ids = model.predict_classes(X)
ids = np.argmax(model.predict(X), axis=-1)
return ids_to_date_strs(ids)
convert_date_strs(["May 02, 2020", "July 14, 1789"])
Cool! Granted, there are certainly much easier ways to write a date conversion tool (e.g., using regular expressions or even basic string manipulation), but you have to admit that using neural networks is way cooler. ;-)
However, real-life sequence-to-sequence problems will usually be harder, so for the sake of completeness, let's build a more powerful model.
Instead of feeding the decoder a simple repetition of the encoder's output vector, we can feed it the target sequence, shifted by one time step to the right. This way, at each time step the decoder will know what the previous target character was. This should help is tackle more complex sequence-to-sequence problems.
Since the first output character of each target sequence has no previous character, we will need a new token to represent the start-of-sequence (sos).
During inference, we won't know the target, so what will we feed the decoder? We can just predict one character at a time, starting with an sos token, then feeding the decoder all the characters that were predicted so far (we will look at this in more details later in this notebook).
But if the decoder's LSTM expects to get the previous target as input at each step, how shall we pass it it the vector output by the encoder? Well, one option is to ignore the output vector, and instead use the encoder's LSTM state as the initial state of the decoder's LSTM (which requires that encoder's LSTM must have the same number of units as the decoder's LSTM).
Now let's create the decoder's inputs (for training, validation and testing). The sos token will be represented using the last possible output character's ID + 1.
sos_id = len(OUTPUT_CHARS) + 1
def shifted_output_sequences(Y):
sos_tokens = tf.fill(dims=(len(Y), 1), value=sos_id)
return tf.concat([sos_tokens, Y[:, :-1]], axis=1)
X_train_decoder = shifted_output_sequences(Y_train)
X_valid_decoder = shifted_output_sequences(Y_valid)
X_test_decoder = shifted_output_sequences(Y_test)
Let's take a look at the decoder's training inputs:
X_train_decoder
Now let's build the model. It's not a simple sequential model anymore, so let's use the functional API:
encoder_embedding_size = 32
decoder_embedding_size = 32
lstm_units = 128
np.random.seed(42)
tf.random.set_seed(42)
encoder_input = keras.layers.Input(shape=[None], dtype=tf.int32)
encoder_embedding = keras.layers.Embedding(
input_dim=len(INPUT_CHARS) + 1,
output_dim=encoder_embedding_size)(encoder_input)
_, encoder_state_h, encoder_state_c = keras.layers.LSTM(
lstm_units, return_state=True)(encoder_embedding)
encoder_state = [encoder_state_h, encoder_state_c]
decoder_input = keras.layers.Input(shape=[None], dtype=tf.int32)
decoder_embedding = keras.layers.Embedding(
input_dim=len(OUTPUT_CHARS) + 2,
output_dim=decoder_embedding_size)(decoder_input)
decoder_lstm_output = keras.layers.LSTM(lstm_units, return_sequences=True)(
decoder_embedding, initial_state=encoder_state)
decoder_output = keras.layers.Dense(len(OUTPUT_CHARS) + 1,
activation="softmax")(decoder_lstm_output)
model = keras.models.Model(inputs=[encoder_input, decoder_input],
outputs=[decoder_output])
optimizer = keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
metrics=["accuracy"])
history = model.fit([X_train, X_train_decoder], Y_train, epochs=10,
validation_data=([X_valid, X_valid_decoder], Y_valid))
This model also reaches 100% validation accuracy, but it does so even faster.
Let's once again use the model to make some predictions. This time we need to predict characters one by one.
sos_id = len(OUTPUT_CHARS) + 1
def predict_date_strs(date_strs):
X = prepare_date_strs_padded(date_strs)
Y_pred = tf.fill(dims=(len(X), 1), value=sos_id)
for index in range(max_output_length):
pad_size = max_output_length - Y_pred.shape[1]
X_decoder = tf.pad(Y_pred, [[0, 0], [0, pad_size]])
Y_probas_next = model.predict([X, X_decoder])[:, index:index+1]
Y_pred_next = tf.argmax(Y_probas_next, axis=-1, output_type=tf.int32)
Y_pred = tf.concat([Y_pred, Y_pred_next], axis=1)
return ids_to_date_strs(Y_pred[:, 1:])
predict_date_strs(["July 14, 1789", "May 01, 2020"])
Works fine! :)
Let's build exactly the same model, but using TF-Addon's seq2seq API. The implementation below is almost very similar to the TFA example higher in this notebook, except without the model input to specify the output sequence length, for simplicity (but you can easily add it back in if you need it for your projects, when the output sequences have very different lengths).
import tensorflow_addons as tfa
np.random.seed(42)
tf.random.set_seed(42)
encoder_embedding_size = 32
decoder_embedding_size = 32
units = 128
encoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
decoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
sequence_lengths = keras.layers.Input(shape=[], dtype=np.int32)
encoder_embeddings = keras.layers.Embedding(
len(INPUT_CHARS) + 1, encoder_embedding_size)(encoder_inputs)
decoder_embedding_layer = keras.layers.Embedding(
len(OUTPUT_CHARS) + 2, decoder_embedding_size)
decoder_embeddings = decoder_embedding_layer(decoder_inputs)
encoder = keras.layers.LSTM(units, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_embeddings)
encoder_state = [state_h, state_c]
sampler = tfa.seq2seq.sampler.TrainingSampler()
decoder_cell = keras.layers.LSTMCell(units)
output_layer = keras.layers.Dense(len(OUTPUT_CHARS) + 1)
decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell,
sampler,
output_layer=output_layer)
final_outputs, final_state, final_sequence_lengths = decoder(
decoder_embeddings,
initial_state=encoder_state)
Y_proba = keras.layers.Activation("softmax")(final_outputs.rnn_output)
model = keras.models.Model(inputs=[encoder_inputs, decoder_inputs],
outputs=[Y_proba])
optimizer = keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
metrics=["accuracy"])
history = model.fit([X_train, X_train_decoder], Y_train, epochs=15,
validation_data=([X_valid, X_valid_decoder], Y_valid))
And once again, 100% validation accuracy! To use the model, we can just reuse the predict_date_strs()
function:
predict_date_strs(["July 14, 1789", "May 01, 2020"])
However, there's a much more efficient way to perform inference. Until now, during inference, we've run the model once for each new character. Instead, we can create a new decoder, based on the previously trained layers, but using a GreedyEmbeddingSampler
instead of a TrainingSampler
.
At each time step, the GreedyEmbeddingSampler
will compute the argmax of the decoder's outputs, and run the resulting token IDs through the decoder's embedding layer. Then it will feed the resulting embeddings to the decoder's LSTM cell at the next time step. This way, we only need to run the decoder once to get the full prediction.
inference_sampler = tfa.seq2seq.sampler.GreedyEmbeddingSampler(
embedding_fn=decoder_embedding_layer)
inference_decoder = tfa.seq2seq.basic_decoder.BasicDecoder(
decoder_cell, inference_sampler, output_layer=output_layer,
maximum_iterations=max_output_length)
batch_size = tf.shape(encoder_inputs)[:1]
start_tokens = tf.fill(dims=batch_size, value=sos_id)
final_outputs, final_state, final_sequence_lengths = inference_decoder(
start_tokens,
initial_state=encoder_state,
start_tokens=start_tokens,
end_token=0)
inference_model = keras.models.Model(inputs=[encoder_inputs],
outputs=[final_outputs.sample_id])
A few notes:
GreedyEmbeddingSampler
needs the start_tokens
(a vector containing the start-of-sequence ID for each decoder sequence), and the end_token
(the decoder will stop decoding a sequence once the model outputs this token).maximum_iterations
when creating the BasicDecoder
, or else it may run into an infinite loop (if the model never outputs the end token for at least one of the sequences). This would force you would to restart the Jupyter kernel.final_outputs.sample_id
instead of the softmax of final_outputs.rnn_outputs
. This allows us to directly get the argmax of the model's outputs. If you prefer to have access to the logits, you can replace final_outputs.sample_id
with final_outputs.rnn_outputs
.Now we can write a simple function that uses the model to perform the date format conversion:
def fast_predict_date_strs(date_strs):
X = prepare_date_strs_padded(date_strs)
Y_pred = inference_model.predict(X)
return ids_to_date_strs(Y_pred)
fast_predict_date_strs(["July 14, 1789", "May 01, 2020"])
Let's check that it really is faster:
%timeit predict_date_strs(["July 14, 1789", "May 01, 2020"])
%timeit fast_predict_date_strs(["July 14, 1789", "May 01, 2020"])
That's more than a 10x speedup! And it would be even more if we were handling longer sequences.
Warning: due to a TF bug, this version only works using TensorFlow 2.2 or above.
When we trained the previous model, at each time step t we gave the model the target token for time step t - 1. However, at inference time, the model did not get the previous target at each time step. Instead, it got the previous prediction. So there is a discrepancy between training and inference, which may lead to disappointing performance. To alleviate this, we can gradually replace the targets with the predictions, during training. For this, we just need to replace the TrainingSampler
with a ScheduledEmbeddingTrainingSampler
, and use a Keras callback to gradually increase the sampling_probability
(i.e., the probability that the decoder will use the prediction from the previous time step rather than the target for the previous time step).
import tensorflow_addons as tfa
np.random.seed(42)
tf.random.set_seed(42)
n_epochs = 20
encoder_embedding_size = 32
decoder_embedding_size = 32
units = 128
encoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
decoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
sequence_lengths = keras.layers.Input(shape=[], dtype=np.int32)
encoder_embeddings = keras.layers.Embedding(
len(INPUT_CHARS) + 1, encoder_embedding_size)(encoder_inputs)
decoder_embedding_layer = keras.layers.Embedding(
len(OUTPUT_CHARS) + 2, decoder_embedding_size)
decoder_embeddings = decoder_embedding_layer(decoder_inputs)
encoder = keras.layers.LSTM(units, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_embeddings)
encoder_state = [state_h, state_c]
sampler = tfa.seq2seq.sampler.ScheduledEmbeddingTrainingSampler(
sampling_probability=0.,
embedding_fn=decoder_embedding_layer)
# we must set the sampling_probability after creating the sampler
# (see https://github.com/tensorflow/addons/pull/1714)
sampler.sampling_probability = tf.Variable(0.)
decoder_cell = keras.layers.LSTMCell(units)
output_layer = keras.layers.Dense(len(OUTPUT_CHARS) + 1)
decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell,
sampler,
output_layer=output_layer)
final_outputs, final_state, final_sequence_lengths = decoder(
decoder_embeddings,
initial_state=encoder_state)
Y_proba = keras.layers.Activation("softmax")(final_outputs.rnn_output)
model = keras.models.Model(inputs=[encoder_inputs, decoder_inputs],
outputs=[Y_proba])
optimizer = keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
metrics=["accuracy"])
def update_sampling_probability(epoch, logs):
proba = min(1.0, epoch / (n_epochs - 10))
sampler.sampling_probability.assign(proba)
sampling_probability_cb = keras.callbacks.LambdaCallback(
on_epoch_begin=update_sampling_probability)
history = model.fit([X_train, X_train_decoder], Y_train, epochs=n_epochs,
validation_data=([X_valid, X_valid_decoder], Y_valid),
callbacks=[sampling_probability_cb])
Not quite 100% validation accuracy, but close enough!
For inference, we could do the exact same thing as earlier, using a GreedyEmbeddingSampler
. However, just for the sake of completeness, let's use a SampleEmbeddingSampler
instead. It's almost the same thing, except that instead of using the argmax of the model's output to find the token ID, it treats the outputs as logits and uses them to sample a token ID randomly. This can be useful when you want to generate text. The softmax_temperature
argument serves the
same purpose as when we generated Shakespeare-like text (the higher this argument, the more random the generated text will be).
softmax_temperature = tf.Variable(1.)
inference_sampler = tfa.seq2seq.sampler.SampleEmbeddingSampler(
embedding_fn=decoder_embedding_layer,
softmax_temperature=softmax_temperature)
inference_decoder = tfa.seq2seq.basic_decoder.BasicDecoder(
decoder_cell, inference_sampler, output_layer=output_layer,
maximum_iterations=max_output_length)
batch_size = tf.shape(encoder_inputs)[:1]
start_tokens = tf.fill(dims=batch_size, value=sos_id)
final_outputs, final_state, final_sequence_lengths = inference_decoder(
start_tokens,
initial_state=encoder_state,
start_tokens=start_tokens,
end_token=0)
inference_model = keras.models.Model(inputs=[encoder_inputs],
outputs=[final_outputs.sample_id])
def creative_predict_date_strs(date_strs, temperature=1.0):
softmax_temperature.assign(temperature)
X = prepare_date_strs_padded(date_strs)
Y_pred = inference_model.predict(X)
return ids_to_date_strs(Y_pred)
tf.random.set_seed(42)
creative_predict_date_strs(["July 14, 1789", "May 01, 2020"])
Dates look good at room temperature. Now let's heat things up a bit:
tf.random.set_seed(42)
creative_predict_date_strs(["July 14, 1789", "May 01, 2020"],
temperature=5.)
Oops, the dates are overcooked, now. Let's call them "creative" dates.
The sequences in this problem are pretty short, but if we wanted to tackle longer sequences, we would probably have to use attention mechanisms. While it's possible to code our own implementation, it's simpler and more efficient to use TF-Addons's implementation instead. Let's do that now, this time using Keras' subclassing API.
Warning: due to a TensorFlow bug (see this issue for details), the get_initial_state()
method fails in eager mode, so for now we have to use the subclassing API, as Keras automatically calls tf.function()
on the call()
method (so it runs in graph mode).
In this implementation, we've reverted back to using the TrainingSampler
, for simplicity (but you can easily tweak it to use a ScheduledEmbeddingTrainingSampler
instead). We also use a GreedyEmbeddingSampler
during inference, so this class is pretty easy to use:
class DateTranslation(keras.models.Model):
def __init__(self, units=128, encoder_embedding_size=32,
decoder_embedding_size=32, **kwargs):
super().__init__(**kwargs)
self.encoder_embedding = keras.layers.Embedding(
input_dim=len(INPUT_CHARS) + 1,
output_dim=encoder_embedding_size)
self.encoder = keras.layers.LSTM(units,
return_sequences=True,
return_state=True)
self.decoder_embedding = keras.layers.Embedding(
input_dim=len(OUTPUT_CHARS) + 2,
output_dim=decoder_embedding_size)
self.attention = tfa.seq2seq.LuongAttention(units)
decoder_inner_cell = keras.layers.LSTMCell(units)
self.decoder_cell = tfa.seq2seq.AttentionWrapper(
cell=decoder_inner_cell,
attention_mechanism=self.attention)
output_layer = keras.layers.Dense(len(OUTPUT_CHARS) + 1)
self.decoder = tfa.seq2seq.BasicDecoder(
cell=self.decoder_cell,
sampler=tfa.seq2seq.sampler.TrainingSampler(),
output_layer=output_layer)
self.inference_decoder = tfa.seq2seq.BasicDecoder(
cell=self.decoder_cell,
sampler=tfa.seq2seq.sampler.GreedyEmbeddingSampler(
embedding_fn=self.decoder_embedding),
output_layer=output_layer,
maximum_iterations=max_output_length)
def call(self, inputs, training=None):
encoder_input, decoder_input = inputs
encoder_embeddings = self.encoder_embedding(encoder_input)
encoder_outputs, encoder_state_h, encoder_state_c = self.encoder(
encoder_embeddings,
training=training)
encoder_state = [encoder_state_h, encoder_state_c]
self.attention(encoder_outputs,
setup_memory=True)
decoder_embeddings = self.decoder_embedding(decoder_input)
decoder_initial_state = self.decoder_cell.get_initial_state(
decoder_embeddings)
decoder_initial_state = decoder_initial_state.clone(
cell_state=encoder_state)
if training:
decoder_outputs, _, _ = self.decoder(
decoder_embeddings,
initial_state=decoder_initial_state,
training=training)
else:
start_tokens = tf.zeros_like(encoder_input[:, 0]) + sos_id
decoder_outputs, _, _ = self.inference_decoder(
decoder_embeddings,
initial_state=decoder_initial_state,
start_tokens=start_tokens,
end_token=0)
return tf.nn.softmax(decoder_outputs.rnn_output)
np.random.seed(42)
tf.random.set_seed(42)
model = DateTranslation()
optimizer = keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
metrics=["accuracy"])
history = model.fit([X_train, X_train_decoder], Y_train, epochs=25,
validation_data=([X_valid, X_valid_decoder], Y_valid))
Not quite 100% validation accuracy, but close. It took a bit longer to converge this time, but there were also more parameters and more computations per iteration. And we did not use a scheduled sampler.
To use the model, we can write yet another little function:
def fast_predict_date_strs_v2(date_strs):
X = prepare_date_strs_padded(date_strs)
X_decoder = tf.zeros(shape=(len(X), max_output_length), dtype=tf.int32)
Y_probas = model.predict([X, X_decoder])
Y_pred = tf.argmax(Y_probas, axis=-1)
return ids_to_date_strs(Y_pred)
fast_predict_date_strs_v2(["July 14, 1789", "May 01, 2020"])
There are still a few interesting features from TF-Addons that you may want to look at:
BeamSearchDecoder
rather than a BasicDecoder
for inference. Instead of outputing the character with the highest probability, this decoder keeps track of the several candidates, and keeps only the most likely sequences of candidates (see chapter 16 in the book for more details).sequence_length
if the input or target sequences may have very different lengths.ScheduledOutputTrainingSampler
, which gives you more flexibility than the ScheduledEmbeddingTrainingSampler
to decide how to feed the output at time t to the cell at time t+1. By default it feeds the outputs directly to cell, without computing the argmax ID and passing it through an embedding layer. Alternatively, you specify a next_inputs_fn
function that will be used to convert the cell outputs to inputs at the next step.Exercise: Go through TensorFlow's Neural Machine Translation with Attention tutorial.
Simply open the Colab and follow its instructions. Alternatively, if you want a simpler example of using TF-Addons's seq2seq implementation for Neural Machine Translation (NMT), look at the solution to the previous question. The last model implementation will give you a simpler example of using TF-Addons to build an NMT model using attention mechanisms.
Exercise: Use one of the recent language models (e.g., GPT) to generate more convincing Shakespearean text.
The simplest way to use recent language models is to use the excellent transformers library, open sourced by Hugging Face. It provides many modern neural net architectures (including BERT, GPT-2, RoBERTa, XLM, DistilBert, XLNet and more) for Natural Language Processing (NLP), including many pretrained models. It relies on either TensorFlow or PyTorch. Best of all: it's amazingly simple to use.
First, let's load a pretrained model. In this example, we will use OpenAI's GPT model, with an additional Language Model on top (just a linear layer with weights tied to the input embeddings). Let's import it and load the pretrained weights (this will download about 445MB of data to ~/.cache/torch/transformers
):
from transformers import TFOpenAIGPTLMHeadModel
model = TFOpenAIGPTLMHeadModel.from_pretrained("openai-gpt")
from transformers import OpenAIGPTTokenizer
tokenizer = OpenAIGPTTokenizer.from_pretrained("openai-gpt")
Now let's use the tokenizer to tokenize and encode the prompt text:
prompt_text = "This royal throne of kings, this sceptred isle"
encoded_prompt = tokenizer.encode(prompt_text,
add_special_tokens=False,
return_tensors="tf")
encoded_prompt
Easy! Next, let's use the model to generate text after the prompt. We will generate 5 different sentences, each starting with the prompt text, followed by 40 additional tokens. For an explanation of what all the hyperparameters do, make sure to check out this great blog post by Patrick von Platen (from Hugging Face). You can play around with the hyperparameters to try to obtain better results.
num_sequences = 5
length = 40
generated_sequences = model.generate(
input_ids=encoded_prompt,
do_sample=True,
max_length=length + len(encoded_prompt[0]),
temperature=1.0,
top_k=0,
top_p=0.9,
repetition_penalty=1.0,
num_return_sequences=num_sequences,
)
generated_sequences
Now let's decode the generated sequences and print them:
for sequence in generated_sequences:
text = tokenizer.decode(sequence, clean_up_tokenization_spaces=True)
print(text)
print("-" * 80)
You can try more recent (and larger) models, such as GPT-2, CTRL, Transformer-XL or XLNet, which are all available as pretrained models in the transformers library, including variants with Language Models on top. The preprocessing steps vary slightly between models, so make sure to check out this generation example from the transformers documentation (this example uses PyTorch, but it will work with very little tweaks, such as adding TF
at the beginning of the model class name, removing the .to()
method calls, and using return_tensors="tf"
instead of "pt"
.
Hope you enjoyed this chapter! :)