Chapter 13 – Loading and Preprocessing Data with TensorFlow
This notebook contains all the sample code and solutions to the exercises in chapter 13.
First, let's import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures. We also check that Python 3.5 or later is installed (although Python 2.x may work, it is deprecated so we strongly recommend you use Python 3 instead), as well as Scikit-Learn ≥0.20 and TensorFlow ≥2.0.
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)
# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"
try:
# %tensorflow_version only exists in Colab.
%tensorflow_version 2.x
!pip install -q -U tfx==0.21.2
print("You can safely ignore the package incompatibility errors.")
except Exception:
pass
# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"
# Common imports
import numpy as np
import os
# to make this notebook's output stable across runs
np.random.seed(42)
# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "data"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)
def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
print("Saving figure", fig_id)
if tight_layout:
plt.tight_layout()
plt.savefig(path, format=fig_extension, dpi=resolution)
X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset
Equivalently:
dataset = tf.data.Dataset.range(10)
for item in dataset:
print(item)
dataset = dataset.repeat(3).batch(7)
for item in dataset:
print(item)
dataset = dataset.map(lambda x: x * 2)
for item in dataset:
print(item)
#dataset = dataset.apply(tf.data.experimental.unbatch()) # Now deprecated
dataset = dataset.unbatch()
dataset = dataset.filter(lambda x: x < 10) # keep only items < 10
for item in dataset.take(3):
print(item)
tf.random.set_seed(42)
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=3, seed=42).batch(7)
for item in dataset:
print(item)
Let's start by loading and preparing the California housing dataset. We first load it, then split it into a training set, a validation set and a test set, and finally we scale it:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
X_train_full, y_train_full, random_state=42)
scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_
For a very large dataset that does not fit in memory, you will typically want to split it into many files first, then have TensorFlow read these files in parallel. To demonstrate this, let's start by splitting the housing dataset and save it to 20 CSV files:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
housing_dir = os.path.join("datasets", "housing")
os.makedirs(housing_dir, exist_ok=True)
path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")
filepaths = []
m = len(data)
for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
part_csv = path_format.format(name_prefix, file_idx)
filepaths.append(part_csv)
with open(part_csv, "wt", encoding="utf-8") as f:
if header is not None:
f.write(header)
f.write("\n")
for row_idx in row_indices:
f.write(",".join([repr(col) for col in data[row_idx]]))
f.write("\n")
return filepaths
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)
train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)
Okay, now let's take a peek at the first few lines of one of these CSV files:
import pandas as pd
pd.read_csv(train_filepaths[0]).head()
Or in text mode:
with open(train_filepaths[0]) as f:
for i in range(5):
print(f.readline(), end="")
train_filepaths
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
for filepath in filepath_dataset:
print(filepath)
n_readers = 5
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)
for line in dataset.take(5):
print(line.numpy())
Notice that field 4 is interpreted as a string.
record_defaults=[0, np.nan, tf.constant(np.nan, dtype=tf.float64), "Hello", tf.constant([])]
parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)
parsed_fields
Notice that all missing fields are replaced with their default value, when provided:
parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)
parsed_fields
The 5th field is compulsory (since we provided tf.constant([])
as the "default value"), so we get an exception if we do not provide it:
try:
parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
print(ex)
The number of fields should match exactly the number of fields in the record_defaults
:
try:
parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
print(ex)
n_inputs = 8 # X_train.shape[-1]
@tf.function
def preprocess(line):
defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
fields = tf.io.decode_csv(line, record_defaults=defs)
x = tf.stack(fields[:-1])
y = tf.stack(fields[-1:])
return (x - X_mean) / X_std, y
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
n_read_threads=None, shuffle_buffer_size=10000,
n_parse_threads=5, batch_size=32):
dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
dataset = dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers, num_parallel_calls=n_read_threads)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
dataset = dataset.batch(batch_size)
return dataset.prefetch(1)
tf.random.set_seed(42)
train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
print("X =", X_batch)
print("y =", y_batch)
print()
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
keras.layers.Dense(1),
])
model.compile(loss="mse", optimizer=keras.optimizers.SGD(lr=1e-3))
batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10,
validation_data=valid_set)
model.evaluate(test_set, steps=len(X_test) // batch_size)
new_set = test_set.map(lambda X, y: X) # we could instead just pass test_set, Keras would ignore the labels
X_new = X_test
model.predict(new_set, steps=len(X_new) // batch_size)
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
n_epochs = 5
batch_size = 32
n_steps_per_epoch = len(X_train) // batch_size
total_steps = n_epochs * n_steps_per_epoch
global_step = 0
for X_batch, y_batch in train_set.take(total_steps):
global_step += 1
print("\rGlobal step {}/{}".format(global_step, total_steps), end="")
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
@tf.function
def train(model, n_epochs, batch_size=32,
n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,
n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
n_parse_threads=n_parse_threads, batch_size=batch_size)
for X_batch, y_batch in train_set:
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train(model, 5)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
@tf.function
def train(model, n_epochs, batch_size=32,
n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,
n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
n_parse_threads=n_parse_threads, batch_size=batch_size)
n_steps_per_epoch = len(X_train) // batch_size
total_steps = n_epochs * n_steps_per_epoch
global_step = 0
for X_batch, y_batch in train_set.take(total_steps):
global_step += 1
if tf.equal(global_step % 100, 0):
tf.print("\rGlobal step", global_step, "/", total_steps)
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train(model, 5)
Here is a short description of each method in the Dataset
class:
for m in dir(tf.data.Dataset):
if not (m.startswith("_") or m.endswith("_")):
func = getattr(tf.data.Dataset, m)
if hasattr(func, "__doc__"):
print("● {:21s}{}".format(m + "()", func.__doc__.split("\n")[0]))
TFRecord
binary format¶A TFRecord file is just a list of binary records. You can create one using a tf.io.TFRecordWriter
:
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
f.write(b"This is the first record")
f.write(b"And this is the second record")
And you can read it using a tf.data.TFRecordDataset
:
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
print(item)
You can read multiple TFRecord files with just one TFRecordDataset
. By default it will read them one at a time, but if you set num_parallel_reads=3
, it will read 3 at a time in parallel and interleave their records:
filepaths = ["my_test_{}.tfrecord".format(i) for i in range(5)]
for i, filepath in enumerate(filepaths):
with tf.io.TFRecordWriter(filepath) as f:
for j in range(3):
f.write("File {} record {}".format(i, j).encode("utf-8"))
dataset = tf.data.TFRecordDataset(filepaths, num_parallel_reads=3)
for item in dataset:
print(item)
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
f.write(b"This is the first record")
f.write(b"And this is the second record")
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
compression_type="GZIP")
for item in dataset:
print(item)
For this section you need to install protobuf. In general you will not have to do so when using TensorFlow, as it comes with functions to create and parse protocol buffers of type tf.train.Example
, which are generally sufficient. However, in this section we will learn about protocol buffers by creating our own simple protobuf definition, so we need the protobuf compiler (protoc
): we will use it to compile the protobuf definition to a Python module that we can then use in our code.
First let's write a simple protobuf definition:
%%writefile person.proto
syntax = "proto3";
message Person {
string name = 1;
int32 id = 2;
repeated string email = 3;
}
And let's compile it (the --descriptor_set_out
and --include_imports
options are only required for the tf.io.decode_proto()
example below):
!protoc person.proto --python_out=. --descriptor_set_out=person.desc --include_imports
!ls person*
from person_pb2 import Person
person = Person(name="Al", id=123, email=["[email protected]"]) # create a Person
print(person) # display the Person
person.name # read a field
person.name = "Alice" # modify a field
person.email[0] # repeated fields can be accessed like arrays
person.email.append("[email protected]") # add an email address
s = person.SerializeToString() # serialize to a byte string
s
person2 = Person() # create a new Person
person2.ParseFromString(s) # parse the byte string (27 bytes)
person == person2 # now they are equal
In rare cases, you may want to parse a custom protobuf (like the one we just created) in TensorFlow. For this you can use the tf.io.decode_proto()
function:
person_tf = tf.io.decode_proto(
bytes=s,
message_type="Person",
field_names=["name", "id", "email"],
output_types=[tf.string, tf.int32, tf.string],
descriptor_source="person.desc")
person_tf.values
For more details, see the tf.io.decode_proto()
documentation.
Here is the definition of the tf.train.Example protobuf:
syntax = "proto3";
message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };
Warning: in TensorFlow 2.0 and 2.1, there was a bug preventing from tensorflow.train import X
so we work around it by writing X = tf.train.X
. See https://github.com/tensorflow/tensorflow/issues/33289 for more details.
#from tensorflow.train import BytesList, FloatList, Int64List
#from tensorflow.train import Feature, Features, Example
BytesList = tf.train.BytesList
FloatList = tf.train.FloatList
Int64List = tf.train.Int64List
Feature = tf.train.Feature
Features = tf.train.Features
Example = tf.train.Example
person_example = Example(
features=Features(
feature={
"name": Feature(bytes_list=BytesList(value=[b"Alice"])),
"id": Feature(int64_list=Int64List(value=[123])),
"emails": Feature(bytes_list=BytesList(value=[b"[email protected]", b"[email protected]"]))
}))
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
f.write(person_example.SerializeToString())
feature_description = {
"name": tf.io.FixedLenFeature([], tf.string, default_value=""),
"id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"emails": tf.io.VarLenFeature(tf.string),
}
for serialized_example in tf.data.TFRecordDataset(["my_contacts.tfrecord"]):
parsed_example = tf.io.parse_single_example(serialized_example,
feature_description)
parsed_example
parsed_example
parsed_example["emails"].values[0]
tf.sparse.to_dense(parsed_example["emails"], default_value=b"")
parsed_example["emails"].values
from sklearn.datasets import load_sample_images
img = load_sample_images()["images"][0]
plt.imshow(img)
plt.axis("off")
plt.title("Original Image")
plt.show()
data = tf.io.encode_jpeg(img)
example_with_image = Example(features=Features(feature={
"image": Feature(bytes_list=BytesList(value=[data.numpy()]))}))
serialized_example = example_with_image.SerializeToString()
# then save to TFRecord
feature_description = { "image": tf.io.VarLenFeature(tf.string) }
example_with_image = tf.io.parse_single_example(serialized_example, feature_description)
decoded_img = tf.io.decode_jpeg(example_with_image["image"].values[0])
Or use decode_image()
which supports BMP, GIF, JPEG and PNG formats:
decoded_img = tf.io.decode_image(example_with_image["image"].values[0])
plt.imshow(decoded_img)
plt.title("Decoded Image")
plt.axis("off")
plt.show()
Tensors can be serialized and parsed easily using tf.io.serialize_tensor()
and tf.io.parse_tensor()
:
t = tf.constant([[0., 1.], [2., 3.], [4., 5.]])
s = tf.io.serialize_tensor(t)
s
tf.io.parse_tensor(s, out_type=tf.float32)
serialized_sparse = tf.io.serialize_sparse(parsed_example["emails"])
serialized_sparse
BytesList(value=serialized_sparse.numpy())
dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(10)
for serialized_examples in dataset:
parsed_examples = tf.io.parse_example(serialized_examples,
feature_description)
parsed_examples
SequenceExample
¶syntax = "proto3";
message FeatureList { repeated Feature feature = 1; };
message FeatureLists { map<string, FeatureList> feature_list = 1; };
message SequenceExample {
Features context = 1;
FeatureLists feature_lists = 2;
};
Warning: in TensorFlow 2.0 and 2.1, there was a bug preventing from tensorflow.train import X
so we work around it by writing X = tf.train.X
. See https://github.com/tensorflow/tensorflow/issues/33289 for more details.
#from tensorflow.train import FeatureList, FeatureLists, SequenceExample
FeatureList = tf.train.FeatureList
FeatureLists = tf.train.FeatureLists
SequenceExample = tf.train.SequenceExample
context = Features(feature={
"author_id": Feature(int64_list=Int64List(value=[123])),
"title": Feature(bytes_list=BytesList(value=[b"A", b"desert", b"place", b"."])),
"pub_date": Feature(int64_list=Int64List(value=[1623, 12, 25]))
})
content = [["When", "shall", "we", "three", "meet", "again", "?"],
["In", "thunder", ",", "lightning", ",", "or", "in", "rain", "?"]]
comments = [["When", "the", "hurlyburly", "'s", "done", "."],
["When", "the", "battle", "'s", "lost", "and", "won", "."]]
def words_to_feature(words):
return Feature(bytes_list=BytesList(value=[word.encode("utf-8")
for word in words]))
content_features = [words_to_feature(sentence) for sentence in content]
comments_features = [words_to_feature(comment) for comment in comments]
sequence_example = SequenceExample(
context=context,
feature_lists=FeatureLists(feature_list={
"content": FeatureList(feature=content_features),
"comments": FeatureList(feature=comments_features)
}))
sequence_example
serialized_sequence_example = sequence_example.SerializeToString()
context_feature_descriptions = {
"author_id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"title": tf.io.VarLenFeature(tf.string),
"pub_date": tf.io.FixedLenFeature([3], tf.int64, default_value=[0, 0, 0]),
}
sequence_feature_descriptions = {
"content": tf.io.VarLenFeature(tf.string),
"comments": tf.io.VarLenFeature(tf.string),
}
parsed_context, parsed_feature_lists = tf.io.parse_single_sequence_example(
serialized_sequence_example, context_feature_descriptions,
sequence_feature_descriptions)
parsed_context
parsed_context["title"].values
parsed_feature_lists
print(tf.RaggedTensor.from_sparse(parsed_feature_lists["content"]))
Let's use the variant of the California housing dataset that we used in Chapter 2, since it contains categorical features and missing values:
import os
import tarfile
import urllib.request
DOWNLOAD_ROOT = "https://raw.githubusercontent.com/ageron/handson-ml2/master/"
HOUSING_PATH = os.path.join("datasets", "housing")
HOUSING_URL = DOWNLOAD_ROOT + "datasets/housing/housing.tgz"
def fetch_housing_data(housing_url=HOUSING_URL, housing_path=HOUSING_PATH):
os.makedirs(housing_path, exist_ok=True)
tgz_path = os.path.join(housing_path, "housing.tgz")
urllib.request.urlretrieve(housing_url, tgz_path)
housing_tgz = tarfile.open(tgz_path)
housing_tgz.extractall(path=housing_path)
housing_tgz.close()
fetch_housing_data()
import pandas as pd
def load_housing_data(housing_path=HOUSING_PATH):
csv_path = os.path.join(housing_path, "housing.csv")
return pd.read_csv(csv_path)
housing = load_housing_data()
housing.head()
housing_median_age = tf.feature_column.numeric_column("housing_median_age")
age_mean, age_std = X_mean[1], X_std[1] # The median age is column in 1
housing_median_age = tf.feature_column.numeric_column(
"housing_median_age", normalizer_fn=lambda x: (x - age_mean) / age_std)
median_income = tf.feature_column.numeric_column("median_income")
bucketized_income = tf.feature_column.bucketized_column(
median_income, boundaries=[1.5, 3., 4.5, 6.])
bucketized_income
ocean_prox_vocab = ['<1H OCEAN', 'INLAND', 'ISLAND', 'NEAR BAY', 'NEAR OCEAN']
ocean_proximity = tf.feature_column.categorical_column_with_vocabulary_list(
"ocean_proximity", ocean_prox_vocab)
ocean_proximity
# Just an example, it's not used later on
city_hash = tf.feature_column.categorical_column_with_hash_bucket(
"city", hash_bucket_size=1000)
city_hash
bucketized_age = tf.feature_column.bucketized_column(
housing_median_age, boundaries=[-1., -0.5, 0., 0.5, 1.]) # age was scaled
age_and_ocean_proximity = tf.feature_column.crossed_column(
[bucketized_age, ocean_proximity], hash_bucket_size=100)
latitude = tf.feature_column.numeric_column("latitude")
longitude = tf.feature_column.numeric_column("longitude")
bucketized_latitude = tf.feature_column.bucketized_column(
latitude, boundaries=list(np.linspace(32., 42., 20 - 1)))
bucketized_longitude = tf.feature_column.bucketized_column(
longitude, boundaries=list(np.linspace(-125., -114., 20 - 1)))
location = tf.feature_column.crossed_column(
[bucketized_latitude, bucketized_longitude], hash_bucket_size=1000)
ocean_proximity_one_hot = tf.feature_column.indicator_column(ocean_proximity)
ocean_proximity_embed = tf.feature_column.embedding_column(ocean_proximity,
dimension=2)
median_house_value = tf.feature_column.numeric_column("median_house_value")
columns = [housing_median_age, median_house_value]
feature_descriptions = tf.feature_column.make_parse_example_spec(columns)
feature_descriptions
with tf.io.TFRecordWriter("my_data_with_features.tfrecords") as f:
for x, y in zip(X_train[:, 1:2], y_train):
example = Example(features=Features(feature={
"housing_median_age": Feature(float_list=FloatList(value=[x])),
"median_house_value": Feature(float_list=FloatList(value=[y]))
}))
f.write(example.SerializeToString())
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
def parse_examples(serialized_examples):
examples = tf.io.parse_example(serialized_examples, feature_descriptions)
targets = examples.pop("median_house_value") # separate the targets
return examples, targets
batch_size = 32
dataset = tf.data.TFRecordDataset(["my_data_with_features.tfrecords"])
dataset = dataset.repeat().shuffle(10000).batch(batch_size).map(parse_examples)
Warning: the DenseFeatures
layer currently does not work with the Functional API, see TF issue #27416. Hopefully this will be resolved before the final release of TF 2.0.
columns_without_target = columns[:-1]
model = keras.models.Sequential([
keras.layers.DenseFeatures(feature_columns=columns_without_target),
keras.layers.Dense(1)
])
model.compile(loss="mse",
optimizer=keras.optimizers.SGD(lr=1e-3),
metrics=["accuracy"])
model.fit(dataset, steps_per_epoch=len(X_train) // batch_size, epochs=5)
some_columns = [ocean_proximity_embed, bucketized_income]
dense_features = keras.layers.DenseFeatures(some_columns)
dense_features({
"ocean_proximity": [["NEAR OCEAN"], ["INLAND"], ["INLAND"]],
"median_income": [[3.], [7.2], [1.]]
})
try:
import tensorflow_transform as tft
def preprocess(inputs): # inputs is a batch of input features
median_age = inputs["housing_median_age"]
ocean_proximity = inputs["ocean_proximity"]
standardized_age = tft.scale_to_z_score(median_age - tft.mean(median_age))
ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
return {
"standardized_median_age": standardized_age,
"ocean_proximity_id": ocean_proximity_id
}
except ImportError:
print("TF Transform is not installed. Try running: pip3 install -U tensorflow-transform")
import tensorflow_datasets as tfds
datasets = tfds.load(name="mnist")
mnist_train, mnist_test = datasets["train"], datasets["test"]
print(tfds.list_builders())
plt.figure(figsize=(6,3))
mnist_train = mnist_train.repeat(5).batch(32).prefetch(1)
for item in mnist_train:
images = item["image"]
labels = item["label"]
for index in range(5):
plt.subplot(1, 5, index + 1)
image = images[index, ..., 0]
label = labels[index].numpy()
plt.imshow(image, cmap="binary")
plt.title(label)
plt.axis("off")
break # just showing part of the first batch
datasets = tfds.load(name="mnist")
mnist_train, mnist_test = datasets["train"], datasets["test"]
mnist_train = mnist_train.repeat(5).batch(32)
mnist_train = mnist_train.map(lambda items: (items["image"], items["label"]))
mnist_train = mnist_train.prefetch(1)
for images, labels in mnist_train.take(1):
print(images.shape)
print(labels.numpy())
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
datasets = tfds.load(name="mnist", batch_size=32, as_supervised=True)
mnist_train = datasets["train"].repeat().prefetch(1)
model = keras.models.Sequential([
keras.layers.Flatten(input_shape=[28, 28, 1]),
keras.layers.Lambda(lambda images: tf.cast(images, tf.float32)),
keras.layers.Dense(10, activation="softmax")])
model.compile(loss="sparse_categorical_crossentropy",
optimizer=keras.optimizers.SGD(lr=1e-3),
metrics=["accuracy"])
model.fit(mnist_train, steps_per_epoch=60000 // 32, epochs=5)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
import tensorflow_hub as hub
hub_layer = hub.KerasLayer("https://tfhub.dev/google/tf2-preview/nnlm-en-dim50/1",
output_shape=[50], input_shape=[], dtype=tf.string)
model = keras.Sequential()
model.add(hub_layer)
model.add(keras.layers.Dense(16, activation='relu'))
model.add(keras.layers.Dense(1, activation='sigmoid'))
model.summary()
sentences = tf.constant(["It was a great movie", "The actors were amazing"])
embeddings = hub_layer(sentences)
embeddings
See Appendix A
_Exercise: Load the Fashion MNIST dataset (introduced in Chapter 10); split it into a training set, a validation set, and a test set; shuffle the training set; and save each dataset to multiple TFRecord files. Each record should be a serialized Example
protobuf with two features: the serialized image (use tf.io.serialize_tensor()
to serialize each image), and the label. Note: for large images, you could use tf.io.encode_jpeg()
instead. This would save a lot of space, but it would lose a bit of image quality._
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
train_set = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train))
valid_set = tf.data.Dataset.from_tensor_slices((X_valid, y_valid))
test_set = tf.data.Dataset.from_tensor_slices((X_test, y_test))
def create_example(image, label):
image_data = tf.io.serialize_tensor(image)
#image_data = tf.io.encode_jpeg(image[..., np.newaxis])
return Example(
features=Features(
feature={
"image": Feature(bytes_list=BytesList(value=[image_data.numpy()])),
"label": Feature(int64_list=Int64List(value=[label])),
}))
for image, label in valid_set.take(1):
print(create_example(image, label))
The following function saves a given dataset to a set of TFRecord files. The examples are written to the files in a round-robin fashion. To do this, we enumerate all the examples using the dataset.enumerate()
method, and we compute index % n_shards
to decide which file to write to. We use the standard contextlib.ExitStack
class to make sure that all writers are properly closed whether or not an I/O error occurs while writing.
from contextlib import ExitStack
def write_tfrecords(name, dataset, n_shards=10):
paths = ["{}.tfrecord-{:05d}-of-{:05d}".format(name, index, n_shards)
for index in range(n_shards)]
with ExitStack() as stack:
writers = [stack.enter_context(tf.io.TFRecordWriter(path))
for path in paths]
for index, (image, label) in dataset.enumerate():
shard = index % n_shards
example = create_example(image, label)
writers[shard].write(example.SerializeToString())
return paths
train_filepaths = write_tfrecords("my_fashion_mnist.train", train_set)
valid_filepaths = write_tfrecords("my_fashion_mnist.valid", valid_set)
test_filepaths = write_tfrecords("my_fashion_mnist.test", test_set)
Exercise: Then use tf.data to create an efficient dataset for each set. Finally, use a Keras model to train these datasets, including a preprocessing layer to standardize each input feature. Try to make the input pipeline as efficient as possible, using TensorBoard to visualize profiling data.
def preprocess(tfrecord):
feature_descriptions = {
"image": tf.io.FixedLenFeature([], tf.string, default_value=""),
"label": tf.io.FixedLenFeature([], tf.int64, default_value=-1)
}
example = tf.io.parse_single_example(tfrecord, feature_descriptions)
image = tf.io.parse_tensor(example["image"], out_type=tf.uint8)
#image = tf.io.decode_jpeg(example["image"])
image = tf.reshape(image, shape=[28, 28])
return image, example["label"]
def mnist_dataset(filepaths, n_read_threads=5, shuffle_buffer_size=None,
n_parse_threads=5, batch_size=32, cache=True):
dataset = tf.data.TFRecordDataset(filepaths,
num_parallel_reads=n_read_threads)
if cache:
dataset = dataset.cache()
if shuffle_buffer_size:
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
dataset = dataset.batch(batch_size)
return dataset.prefetch(1)
train_set = mnist_dataset(train_filepaths, shuffle_buffer_size=60000)
valid_set = mnist_dataset(valid_filepaths)
test_set = mnist_dataset(test_filepaths)
for X, y in train_set.take(1):
for i in range(5):
plt.subplot(1, 5, i + 1)
plt.imshow(X[i].numpy(), cmap="binary")
plt.axis("off")
plt.title(str(y[i].numpy()))
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)
class Standardization(keras.layers.Layer):
def adapt(self, data_sample):
self.means_ = np.mean(data_sample, axis=0, keepdims=True)
self.stds_ = np.std(data_sample, axis=0, keepdims=True)
def call(self, inputs):
return (inputs - self.means_) / (self.stds_ + keras.backend.epsilon())
standardization = Standardization(input_shape=[28, 28])
# or perhaps soon:
#standardization = keras.layers.Normalization()
sample_image_batches = train_set.take(100).map(lambda image, label: image)
sample_images = np.concatenate(list(sample_image_batches.as_numpy_iterator()),
axis=0).astype(np.float32)
standardization.adapt(sample_images)
model = keras.models.Sequential([
standardization,
keras.layers.Flatten(),
keras.layers.Dense(100, activation="relu"),
keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
optimizer="nadam", metrics=["accuracy"])
from datetime import datetime
logs = os.path.join(os.curdir, "my_logs",
"run_" + datetime.now().strftime("%Y%m%d_%H%M%S"))
tensorboard_cb = tf.keras.callbacks.TensorBoard(
log_dir=logs, histogram_freq=1, profile_batch=10)
model.fit(train_set, epochs=5, validation_data=valid_set,
callbacks=[tensorboard_cb])
Warning: The profiling tab in TensorBoard works if you use TensorFlow 2.2+. You also need to make sure tensorboard_plugin_profile
is installed (and restart Jupyter if necessary).
%load_ext tensorboard
%tensorboard --logdir=./my_logs --port=6006
Exercise: In this exercise you will download a dataset, split it, create a tf.data.Dataset
to load it and preprocess it efficiently, then build and train a binary classification model containing an Embedding
layer.
Exercise: Download the Large Movie Review Dataset, which contains 50,000 movies reviews from the Internet Movie Database. The data is organized in two directories, train
and test
, each containing a pos
subdirectory with 12,500 positive reviews and a neg
subdirectory with 12,500 negative reviews. Each review is stored in a separate text file. There are other files and folders (including preprocessed bag-of-words), but we will ignore them in this exercise.
from pathlib import Path
DOWNLOAD_ROOT = "http://ai.stanford.edu/~amaas/data/sentiment/"
FILENAME = "aclImdb_v1.tar.gz"
filepath = keras.utils.get_file(FILENAME, DOWNLOAD_ROOT + FILENAME, extract=True)
path = Path(filepath).parent / "aclImdb"
path
for name, subdirs, files in os.walk(path):
indent = len(Path(name).parts) - len(path.parts)
print(" " * indent + Path(name).parts[-1] + os.sep)
for index, filename in enumerate(sorted(files)):
if index == 3:
print(" " * (indent + 1) + "...")
break
print(" " * (indent + 1) + filename)
def review_paths(dirpath):
return [str(path) for path in dirpath.glob("*.txt")]
train_pos = review_paths(path / "train" / "pos")
train_neg = review_paths(path / "train" / "neg")
test_valid_pos = review_paths(path / "test" / "pos")
test_valid_neg = review_paths(path / "test" / "neg")
len(train_pos), len(train_neg), len(test_valid_pos), len(test_valid_neg)
Exercise: Split the test set into a validation set (15,000) and a test set (10,000).
np.random.shuffle(test_valid_pos)
test_pos = test_valid_pos[:5000]
test_neg = test_valid_neg[:5000]
valid_pos = test_valid_pos[5000:]
valid_neg = test_valid_neg[5000:]
Exercise: Use tf.data to create an efficient dataset for each set.
Since the dataset fits in memory, we can just load all the data using pure Python code and use tf.data.Dataset.from_tensor_slices()
:
def imdb_dataset(filepaths_positive, filepaths_negative):
reviews = []
labels = []
for filepaths, label in ((filepaths_negative, 0), (filepaths_positive, 1)):
for filepath in filepaths:
with open(filepath) as review_file:
reviews.append(review_file.read())
labels.append(label)
return tf.data.Dataset.from_tensor_slices(
(tf.constant(reviews), tf.constant(labels)))
for X, y in imdb_dataset(train_pos, train_neg).take(3):
print(X)
print(y)
print()
%timeit -r1 for X, y in imdb_dataset(train_pos, train_neg).repeat(10): pass
It takes about 17 seconds to load the dataset and go through it 10 times.
But let's pretend the dataset does not fit in memory, just to make things more interesting. Luckily, each review fits on just one line (they use <br />
to indicate line breaks), so we can read the reviews using a TextLineDataset
. If they didn't we would have to preprocess the input files (e.g., converting them to TFRecords). For very large datasets, it would make sense to use a tool like Apache Beam for that.
def imdb_dataset(filepaths_positive, filepaths_negative, n_read_threads=5):
dataset_neg = tf.data.TextLineDataset(filepaths_negative,
num_parallel_reads=n_read_threads)
dataset_neg = dataset_neg.map(lambda review: (review, 0))
dataset_pos = tf.data.TextLineDataset(filepaths_positive,
num_parallel_reads=n_read_threads)
dataset_pos = dataset_pos.map(lambda review: (review, 1))
return tf.data.Dataset.concatenate(dataset_pos, dataset_neg)
%timeit -r1 for X, y in imdb_dataset(train_pos, train_neg).repeat(10): pass
Now it takes about 33 seconds to go through the dataset 10 times. That's much slower, essentially because the dataset is not cached in RAM, so it must be reloaded at each epoch. If you add .cache()
just before .repeat(10)
, you will see that this implementation will be about as fast as the previous one.
%timeit -r1 for X, y in imdb_dataset(train_pos, train_neg).cache().repeat(10): pass
batch_size = 32
train_set = imdb_dataset(train_pos, train_neg).shuffle(25000).batch(batch_size).prefetch(1)
valid_set = imdb_dataset(valid_pos, valid_neg).batch(batch_size).prefetch(1)
test_set = imdb_dataset(test_pos, test_neg).batch(batch_size).prefetch(1)
_Exercise: Create a binary classification model, using a TextVectorization
layer to preprocess each review. If the TextVectorization
layer is not yet available (or if you like a challenge), try to create your own custom preprocessing layer: you can use the functions in the tf.strings
package, for example lower()
to make everything lowercase, regex_replace()
to replace punctuation with spaces, and split()
to split words on spaces. You should use a lookup table to output word indices, which must be prepared in the adapt()
method._
Let's first write a function to preprocess the reviews, cropping them to 300 characters, converting them to lower case, then replacing <br />
and all non-letter characters to spaces, splitting the reviews into words, and finally padding or cropping each review so it ends up with exactly n_words
tokens:
def preprocess(X_batch, n_words=50):
shape = tf.shape(X_batch) * tf.constant([1, 0]) + tf.constant([0, n_words])
Z = tf.strings.substr(X_batch, 0, 300)
Z = tf.strings.lower(Z)
Z = tf.strings.regex_replace(Z, b"<br\\s*/?>", b" ")
Z = tf.strings.regex_replace(Z, b"[^a-z]", b" ")
Z = tf.strings.split(Z)
return Z.to_tensor(shape=shape, default_value=b"<pad>")
X_example = tf.constant(["It's a great, great movie! I loved it.", "It was terrible, run away!!!"])
preprocess(X_example)
Now let's write a second utility function that will take a data sample with the same format as the output of the preprocess()
function, and will output the list of the top max_size
most frequent words, ensuring that the padding token is first:
from collections import Counter
def get_vocabulary(data_sample, max_size=1000):
preprocessed_reviews = preprocess(data_sample).numpy()
counter = Counter()
for words in preprocessed_reviews:
for word in words:
if word != b"<pad>":
counter[word] += 1
return [b"<pad>"] + [word for word, count in counter.most_common(max_size)]
get_vocabulary(X_example)
Now we are ready to create the TextVectorization
layer. Its constructor just saves the hyperparameters (max_vocabulary_size
and n_oov_buckets
). The adapt()
method computes the vocabulary using the get_vocabulary()
function, then it builds a StaticVocabularyTable
(see Chapter 16 for more details). The call()
method preprocesses the reviews to get a padded list of words for each review, then it uses the StaticVocabularyTable
to lookup the index of each word in the vocabulary:
class TextVectorization(keras.layers.Layer):
def __init__(self, max_vocabulary_size=1000, n_oov_buckets=100, dtype=tf.string, **kwargs):
super().__init__(dtype=dtype, **kwargs)
self.max_vocabulary_size = max_vocabulary_size
self.n_oov_buckets = n_oov_buckets
def adapt(self, data_sample):
self.vocab = get_vocabulary(data_sample, self.max_vocabulary_size)
words = tf.constant(self.vocab)
word_ids = tf.range(len(self.vocab), dtype=tf.int64)
vocab_init = tf.lookup.KeyValueTensorInitializer(words, word_ids)
self.table = tf.lookup.StaticVocabularyTable(vocab_init, self.n_oov_buckets)
def call(self, inputs):
preprocessed_inputs = preprocess(inputs)
return self.table.lookup(preprocessed_inputs)
Let's try it on our small X_example
we defined earlier:
text_vectorization = TextVectorization()
text_vectorization.adapt(X_example)
text_vectorization(X_example)
Looks good! As you can see, each review was cleaned up and tokenized, then each word was encoded as its index in the vocabulary (all the 0s correspond to the <pad>
tokens).
Now let's create another TextVectorization
layer and let's adapt it to the full IMDB training set (if the training set did not fit in RAM, we could just use a smaller sample of the training set by calling train_set.take(500)
):
max_vocabulary_size = 1000
n_oov_buckets = 100
sample_review_batches = train_set.map(lambda review, label: review)
sample_reviews = np.concatenate(list(sample_review_batches.as_numpy_iterator()),
axis=0)
text_vectorization = TextVectorization(max_vocabulary_size, n_oov_buckets,
input_shape=[])
text_vectorization.adapt(sample_reviews)
Let's run it on the same X_example
, just to make sure the word IDs are larger now, since the vocabulary is bigger:
text_vectorization(X_example)
Good! Now let's take a look at the first 10 words in the vocabulary:
text_vectorization.vocab[:10]
These are the most common words in the reviews.
Now to build our model we will need to encode all these word IDs somehow. One approach is to create bags of words: for each review, and for each word in the vocabulary, we count the number of occurences of that word in the review. For example:
simple_example = tf.constant([[1, 3, 1, 0, 0], [2, 2, 0, 0, 0]])
tf.reduce_sum(tf.one_hot(simple_example, 4), axis=1)
The first review has 2 times the word 0, 2 times the word 1, 0 times the word 2, and 1 time the word 3, so its bag-of-words representation is [2, 2, 0, 1]
. Similarly, the second review has 3 times the word 0, 0 times the word 1, and so on. Let's wrap this logic in a small custom layer, and let's test it. We'll drop the counts for the word 0, since this corresponds to the <pad>
token, which we don't care about.
class BagOfWords(keras.layers.Layer):
def __init__(self, n_tokens, dtype=tf.int32, **kwargs):
super().__init__(dtype=dtype, **kwargs)
self.n_tokens = n_tokens
def call(self, inputs):
one_hot = tf.one_hot(inputs, self.n_tokens)
return tf.reduce_sum(one_hot, axis=1)[:, 1:]
Let's test it:
bag_of_words = BagOfWords(n_tokens=4)
bag_of_words(simple_example)
It works fine! Now let's create another BagOfWord
with the right vocabulary size for our training set:
n_tokens = max_vocabulary_size + n_oov_buckets + 1 # add 1 for <pad>
bag_of_words = BagOfWords(n_tokens)
We're ready to train the model!
model = keras.models.Sequential([
text_vectorization,
bag_of_words,
keras.layers.Dense(100, activation="relu"),
keras.layers.Dense(1, activation="sigmoid"),
])
model.compile(loss="binary_crossentropy", optimizer="nadam",
metrics=["accuracy"])
model.fit(train_set, epochs=5, validation_data=valid_set)
We get about 73.7% accuracy on the validation set after just the first epoch, but after that the model makes no significant progress. We will do better in Chapter 16. For now the point is just to perform efficient preprocessing using tf.data
and Keras preprocessing layers.
Exercise: Add an Embedding
layer and compute the mean embedding for each review, multiplied by the square root of the number of words (see Chapter 16). This rescaled mean embedding can then be passed to the rest of your model.
To compute the mean embedding for each review, and multiply it by the square root of the number of words in that review, we will need a little function:
def compute_mean_embedding(inputs):
not_pad = tf.math.count_nonzero(inputs, axis=-1)
n_words = tf.math.count_nonzero(not_pad, axis=-1, keepdims=True)
sqrt_n_words = tf.math.sqrt(tf.cast(n_words, tf.float32))
return tf.reduce_mean(inputs, axis=1) * sqrt_n_words
another_example = tf.constant([[[1., 2., 3.], [4., 5., 0.], [0., 0., 0.]],
[[6., 0., 0.], [0., 0., 0.], [0., 0., 0.]]])
compute_mean_embedding(another_example)
Let's check that this is correct. The first review contains 2 words (the last token is a zero vector, which represents the <pad>
token). The second review contains 1 word. So we need to compute the mean embedding for each review, and multiply the first one by the square root of 2, and the second one by the square root of 1:
tf.reduce_mean(another_example, axis=1) * tf.sqrt([[2.], [1.]])
Perfect. Now we're ready to train our final model. It's the same as before, except we replaced the BagOfWords
layer with an Embedding
layer followed by a Lambda
layer that calls the compute_mean_embedding
layer:
embedding_size = 20
model = keras.models.Sequential([
text_vectorization,
keras.layers.Embedding(input_dim=n_tokens,
output_dim=embedding_size,
mask_zero=True), # <pad> tokens => zero vectors
keras.layers.Lambda(compute_mean_embedding),
keras.layers.Dense(100, activation="relu"),
keras.layers.Dense(1, activation="sigmoid"),
])
Exercise: Train the model and see what accuracy you get. Try to optimize your pipelines to make training as fast as possible.
model.compile(loss="binary_crossentropy", optimizer="nadam", metrics=["accuracy"])
model.fit(train_set, epochs=5, validation_data=valid_set)
The model is not better using embeddings (but we will do better in Chapter 16). The pipeline looks fast enough (we optimized it earlier).
_Exercise: Use TFDS to load the same dataset more easily: tfds.load("imdb_reviews")
._
import tensorflow_datasets as tfds
datasets = tfds.load(name="imdb_reviews")
train_set, test_set = datasets["train"], datasets["test"]
for example in train_set.take(1):
print(example["text"])
print(example["label"])