#!/usr/bin/env python # coding: utf-8 # **Chapter 12 – Custom Models and Training with TensorFlow** # _This notebook contains all the sample code in chapter 12._ # # # #
# Open In Colab # # #
# # Setup # First, let's import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures. We also check that Python 3.5 or later is installed (although Python 2.x may work, it is deprecated so we strongly recommend you use Python 3 instead), as well as Scikit-Learn ≥0.20 and TensorFlow ≥2.0. # In[1]: # Python ≥3.5 is required import sys assert sys.version_info >= (3, 5) # Scikit-Learn ≥0.20 is required import sklearn assert sklearn.__version__ >= "0.20" try: # %tensorflow_version only exists in Colab. get_ipython().run_line_magic('tensorflow_version', '2.x') except Exception: pass # TensorFlow ≥2.4 is required in this notebook # Earlier 2.x versions will mostly work the same, but with a few bugs import tensorflow as tf from tensorflow import keras assert tf.__version__ >= "2.4" # Common imports import numpy as np import os # to make this notebook's output stable across runs np.random.seed(42) tf.random.set_seed(42) # To plot pretty figures get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib as mpl import matplotlib.pyplot as plt mpl.rc('axes', labelsize=14) mpl.rc('xtick', labelsize=12) mpl.rc('ytick', labelsize=12) # Where to save the figures PROJECT_ROOT_DIR = "." CHAPTER_ID = "deep" IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID) os.makedirs(IMAGES_PATH, exist_ok=True) def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300): path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension) print("Saving figure", fig_id) if tight_layout: plt.tight_layout() plt.savefig(path, format=fig_extension, dpi=resolution) # ## Tensors and operations # ### Tensors # In[2]: tf.constant([[1., 2., 3.], [4., 5., 6.]]) # matrix # In[3]: tf.constant(42) # scalar # In[4]: t = tf.constant([[1., 2., 3.], [4., 5., 6.]]) t # In[5]: t.shape # In[6]: t.dtype # ### Indexing # In[7]: t[:, 1:] # In[8]: t[..., 1, tf.newaxis] # ### Ops # In[9]: t + 10 # In[10]: tf.square(t) # In[11]: t @ tf.transpose(t) # ### Using `keras.backend` # In[12]: from tensorflow import keras K = keras.backend K.square(K.transpose(t)) + 10 # ### From/To NumPy # In[13]: a = np.array([2., 4., 5.]) tf.constant(a) # In[14]: t.numpy() # In[15]: np.array(t) # In[16]: tf.square(a) # In[17]: np.square(t) # ### Conflicting Types # In[18]: try: tf.constant(2.0) + tf.constant(40) except tf.errors.InvalidArgumentError as ex: print(ex) # In[19]: try: tf.constant(2.0) + tf.constant(40., dtype=tf.float64) except tf.errors.InvalidArgumentError as ex: print(ex) # In[20]: t2 = tf.constant(40., dtype=tf.float64) tf.constant(2.0) + tf.cast(t2, tf.float32) # ### Strings # In[21]: tf.constant(b"hello world") # In[22]: tf.constant("café") # In[23]: u = tf.constant([ord(c) for c in "café"]) u # In[24]: b = tf.strings.unicode_encode(u, "UTF-8") tf.strings.length(b, unit="UTF8_CHAR") # In[25]: tf.strings.unicode_decode(b, "UTF-8") # ### String arrays # In[26]: p = tf.constant(["Café", "Coffee", "caffè", "咖啡"]) # In[27]: tf.strings.length(p, unit="UTF8_CHAR") # In[28]: r = tf.strings.unicode_decode(p, "UTF8") r # In[29]: print(r) # ### Ragged tensors # In[30]: print(r[1]) # In[31]: print(r[1:3]) # In[32]: r2 = tf.ragged.constant([[65, 66], [], [67]]) print(tf.concat([r, r2], axis=0)) # In[33]: r3 = tf.ragged.constant([[68, 69, 70], [71], [], [72, 73]]) print(tf.concat([r, r3], axis=1)) # In[34]: tf.strings.unicode_encode(r3, "UTF-8") # In[35]: r.to_tensor() # ### Sparse tensors # In[36]: s = tf.SparseTensor(indices=[[0, 1], [1, 0], [2, 3]], values=[1., 2., 3.], dense_shape=[3, 4]) # In[37]: print(s) # In[38]: tf.sparse.to_dense(s) # In[39]: s2 = s * 2.0 # In[40]: try: s3 = s + 1. except TypeError as ex: print(ex) # In[41]: s4 = tf.constant([[10., 20.], [30., 40.], [50., 60.], [70., 80.]]) tf.sparse.sparse_dense_matmul(s, s4) # In[42]: s5 = tf.SparseTensor(indices=[[0, 2], [0, 1]], values=[1., 2.], dense_shape=[3, 4]) print(s5) # In[43]: try: tf.sparse.to_dense(s5) except tf.errors.InvalidArgumentError as ex: print(ex) # In[44]: s6 = tf.sparse.reorder(s5) tf.sparse.to_dense(s6) # ### Sets # In[45]: set1 = tf.constant([[2, 3, 5, 7], [7, 9, 0, 0]]) set2 = tf.constant([[4, 5, 6], [9, 10, 0]]) tf.sparse.to_dense(tf.sets.union(set1, set2)) # In[46]: tf.sparse.to_dense(tf.sets.difference(set1, set2)) # In[47]: tf.sparse.to_dense(tf.sets.intersection(set1, set2)) # ### Variables # In[48]: v = tf.Variable([[1., 2., 3.], [4., 5., 6.]]) # In[49]: v.assign(2 * v) # In[50]: v[0, 1].assign(42) # In[51]: v[:, 2].assign([0., 1.]) # In[52]: try: v[1] = [7., 8., 9.] except TypeError as ex: print(ex) # In[53]: v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.]) # In[54]: sparse_delta = tf.IndexedSlices(values=[[1., 2., 3.], [4., 5., 6.]], indices=[1, 0]) v.scatter_update(sparse_delta) # ### Tensor Arrays # In[55]: array = tf.TensorArray(dtype=tf.float32, size=3) array = array.write(0, tf.constant([1., 2.])) array = array.write(1, tf.constant([3., 10.])) array = array.write(2, tf.constant([5., 7.])) # In[56]: array.read(1) # In[57]: array.stack() # In[58]: mean, variance = tf.nn.moments(array.stack(), axes=0) mean # In[59]: variance # ## Custom loss function # Let's start by loading and preparing the California housing dataset. We first load it, then split it into a training set, a validation set and a test set, and finally we scale it: # In[60]: from sklearn.datasets import fetch_california_housing from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler housing = fetch_california_housing() X_train_full, X_test, y_train_full, y_test = train_test_split( housing.data, housing.target.reshape(-1, 1), random_state=42) X_train, X_valid, y_train, y_valid = train_test_split( X_train_full, y_train_full, random_state=42) scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_valid_scaled = scaler.transform(X_valid) X_test_scaled = scaler.transform(X_test) # In[61]: def huber_fn(y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < 1 squared_loss = tf.square(error) / 2 linear_loss = tf.abs(error) - 0.5 return tf.where(is_small_error, squared_loss, linear_loss) # In[62]: plt.figure(figsize=(8, 3.5)) z = np.linspace(-4, 4, 200) plt.plot(z, huber_fn(0, z), "b-", linewidth=2, label="huber($z$)") plt.plot(z, z**2 / 2, "b:", linewidth=1, label=r"$\frac{1}{2}z^2$") plt.plot([-1, -1], [0, huber_fn(0., -1.)], "r--") plt.plot([1, 1], [0, huber_fn(0., 1.)], "r--") plt.gca().axhline(y=0, color='k') plt.gca().axvline(x=0, color='k') plt.axis([-4, 4, 0, 4]) plt.grid(True) plt.xlabel("$z$") plt.legend(fontsize=14) plt.title("Huber loss", fontsize=14) plt.show() # In[63]: input_shape = X_train.shape[1:] model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[64]: model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"]) # In[65]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # ## Saving/Loading Models with Custom Objects # In[66]: model.save("my_model_with_a_custom_loss.h5") # In[67]: model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": huber_fn}) # In[68]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[69]: def create_huber(threshold=1.0): def huber_fn(y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < threshold squared_loss = tf.square(error) / 2 linear_loss = threshold * tf.abs(error) - threshold**2 / 2 return tf.where(is_small_error, squared_loss, linear_loss) return huber_fn # In[70]: model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"]) # In[71]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[72]: model.save("my_model_with_a_custom_loss_threshold_2.h5") # In[73]: model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5", custom_objects={"huber_fn": create_huber(2.0)}) # In[74]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[75]: class HuberLoss(keras.losses.Loss): def __init__(self, threshold=1.0, **kwargs): self.threshold = threshold super().__init__(**kwargs) def call(self, y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < self.threshold squared_loss = tf.square(error) / 2 linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2 return tf.where(is_small_error, squared_loss, linear_loss) def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} # In[76]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[77]: model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"]) # In[78]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[79]: model.save("my_model_with_a_custom_loss_class.h5") # In[80]: model = keras.models.load_model("my_model_with_a_custom_loss_class.h5", custom_objects={"HuberLoss": HuberLoss}) # In[81]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[82]: model.loss.threshold # ## Other Custom Functions # In[83]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[84]: def my_softplus(z): # return value is just tf.nn.softplus(z) return tf.math.log(tf.exp(z) + 1.0) def my_glorot_initializer(shape, dtype=tf.float32): stddev = tf.sqrt(2. / (shape[0] + shape[1])) return tf.random.normal(shape, stddev=stddev, dtype=dtype) def my_l1_regularizer(weights): return tf.reduce_sum(tf.abs(0.01 * weights)) def my_positive_weights(weights): # return value is just tf.nn.relu(weights) return tf.where(weights < 0., tf.zeros_like(weights), weights) # In[85]: layer = keras.layers.Dense(1, activation=my_softplus, kernel_initializer=my_glorot_initializer, kernel_regularizer=my_l1_regularizer, kernel_constraint=my_positive_weights) # In[86]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[87]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1, activation=my_softplus, kernel_regularizer=my_l1_regularizer, kernel_constraint=my_positive_weights, kernel_initializer=my_glorot_initializer), ]) # In[88]: model.compile(loss="mse", optimizer="nadam", metrics=["mae"]) # In[89]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[90]: model.save("my_model_with_many_custom_parts.h5") # In[91]: model = keras.models.load_model( "my_model_with_many_custom_parts.h5", custom_objects={ "my_l1_regularizer": my_l1_regularizer, "my_positive_weights": my_positive_weights, "my_glorot_initializer": my_glorot_initializer, "my_softplus": my_softplus, }) # In[92]: class MyL1Regularizer(keras.regularizers.Regularizer): def __init__(self, factor): self.factor = factor def __call__(self, weights): return tf.reduce_sum(tf.abs(self.factor * weights)) def get_config(self): return {"factor": self.factor} # In[93]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[94]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1, activation=my_softplus, kernel_regularizer=MyL1Regularizer(0.01), kernel_constraint=my_positive_weights, kernel_initializer=my_glorot_initializer), ]) # In[95]: model.compile(loss="mse", optimizer="nadam", metrics=["mae"]) # In[96]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[97]: model.save("my_model_with_many_custom_parts.h5") # In[98]: model = keras.models.load_model( "my_model_with_many_custom_parts.h5", custom_objects={ "MyL1Regularizer": MyL1Regularizer, "my_positive_weights": my_positive_weights, "my_glorot_initializer": my_glorot_initializer, "my_softplus": my_softplus, }) # ## Custom Metrics # In[99]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[100]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[101]: model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)]) # In[102]: model.fit(X_train_scaled, y_train, epochs=2) # **Note**: if you use the same function as the loss and a metric, you may be surprised to see different results. This is generally just due to floating point precision errors: even though the mathematical equations are equivalent, the operations are not run in the same order, which can lead to small differences. Moreover, when using sample weights, there's more than just precision errors: # * the loss since the start of the epoch is the mean of all batch losses seen so far. Each batch loss is the sum of the weighted instance losses divided by the _batch size_ (not the sum of weights, so the batch loss is _not_ the weighted mean of the losses). # * the metric since the start of the epoch is equal to the sum of weighted instance losses divided by sum of all weights seen so far. In other words, it is the weighted mean of all the instance losses. Not the same thing. # # If you do the math, you will find that loss = metric * mean of sample weights (plus some floating point precision error). # In[103]: model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[create_huber(2.0)]) # In[104]: sample_weight = np.random.rand(len(y_train)) history = model.fit(X_train_scaled, y_train, epochs=2, sample_weight=sample_weight) # In[105]: history.history["loss"][0], history.history["huber_fn"][0] * sample_weight.mean() # ### Streaming metrics # In[106]: precision = keras.metrics.Precision() precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) # In[107]: precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0]) # In[108]: precision.result() # In[109]: precision.variables # In[110]: precision.reset_states() # Creating a streaming metric: # In[111]: class HuberMetric(keras.metrics.Metric): def __init__(self, threshold=1.0, **kwargs): super().__init__(**kwargs) # handles base args (e.g., dtype) self.threshold = threshold self.huber_fn = create_huber(threshold) self.total = self.add_weight("total", initializer="zeros") self.count = self.add_weight("count", initializer="zeros") def update_state(self, y_true, y_pred, sample_weight=None): metric = self.huber_fn(y_true, y_pred) self.total.assign_add(tf.reduce_sum(metric)) self.count.assign_add(tf.cast(tf.size(y_true), tf.float32)) def result(self): return self.total / self.count def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} # In[112]: m = HuberMetric(2.) # total = 2 * |10 - 2| - 2²/2 = 14 # count = 1 # result = 14 / 1 = 14 m(tf.constant([[2.]]), tf.constant([[10.]])) # In[113]: # total = total + (|1 - 0|² / 2) + (2 * |9.25 - 5| - 2² / 2) = 14 + 7 = 21 # count = count + 2 = 3 # result = total / count = 21 / 3 = 7 m(tf.constant([[0.], [5.]]), tf.constant([[1.], [9.25]])) m.result() # In[114]: m.variables # In[115]: m.reset_states() m.variables # Let's check that the `HuberMetric` class works well: # In[116]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[117]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[118]: model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[HuberMetric(2.0)]) # In[119]: model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2) # In[120]: model.save("my_model_with_a_custom_metric.h5") # In[121]: model = keras.models.load_model("my_model_with_a_custom_metric.h5", custom_objects={"huber_fn": create_huber(2.0), "HuberMetric": HuberMetric}) # In[122]: model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2) # **Warning**: In TF 2.2, tf.keras adds an extra first metric in `model.metrics` at position 0 (see [TF issue #38150](https://github.com/tensorflow/tensorflow/issues/38150)). This forces us to use `model.metrics[-1]` rather than `model.metrics[0]` to access the `HuberMetric`. # In[123]: model.metrics[-1].threshold # Looks like it works fine! More simply, we could have created the class like this: # In[124]: class HuberMetric(keras.metrics.Mean): def __init__(self, threshold=1.0, name='HuberMetric', dtype=None): self.threshold = threshold self.huber_fn = create_huber(threshold) super().__init__(name=name, dtype=dtype) def update_state(self, y_true, y_pred, sample_weight=None): metric = self.huber_fn(y_true, y_pred) super(HuberMetric, self).update_state(metric, sample_weight) def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} # This class handles shapes better, and it also supports sample weights. # In[125]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[126]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[127]: model.compile(loss=keras.losses.Huber(2.0), optimizer="nadam", weighted_metrics=[HuberMetric(2.0)]) # In[128]: sample_weight = np.random.rand(len(y_train)) history = model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2, sample_weight=sample_weight) # In[129]: history.history["loss"][0], history.history["HuberMetric"][0] * sample_weight.mean() # In[130]: model.save("my_model_with_a_custom_metric_v2.h5") # In[131]: model = keras.models.load_model("my_model_with_a_custom_metric_v2.h5", custom_objects={"HuberMetric": HuberMetric}) # In[132]: model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2) # In[133]: model.metrics[-1].threshold # ## Custom Layers # In[134]: exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x)) # In[135]: exponential_layer([-1., 0., 1.]) # Adding an exponential layer at the output of a regression model can be useful if the values to predict are positive and with very different scales (e.g., 0.001, 10., 10000): # In[136]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[137]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="relu", input_shape=input_shape), keras.layers.Dense(1), exponential_layer ]) model.compile(loss="mse", optimizer="sgd") model.fit(X_train_scaled, y_train, epochs=5, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # In[138]: class MyDense(keras.layers.Layer): def __init__(self, units, activation=None, **kwargs): super().__init__(**kwargs) self.units = units self.activation = keras.activations.get(activation) def build(self, batch_input_shape): self.kernel = self.add_weight( name="kernel", shape=[batch_input_shape[-1], self.units], initializer="glorot_normal") self.bias = self.add_weight( name="bias", shape=[self.units], initializer="zeros") super().build(batch_input_shape) # must be at the end def call(self, X): return self.activation(X @ self.kernel + self.bias) def compute_output_shape(self, batch_input_shape): return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units]) def get_config(self): base_config = super().get_config() return {**base_config, "units": self.units, "activation": keras.activations.serialize(self.activation)} # In[139]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[140]: model = keras.models.Sequential([ MyDense(30, activation="relu", input_shape=input_shape), MyDense(1) ]) # In[141]: model.compile(loss="mse", optimizer="nadam") model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # In[142]: model.save("my_model_with_a_custom_layer.h5") # In[143]: model = keras.models.load_model("my_model_with_a_custom_layer.h5", custom_objects={"MyDense": MyDense}) # In[144]: class MyMultiLayer(keras.layers.Layer): def call(self, X): X1, X2 = X print("X1.shape: ", X1.shape ," X2.shape: ", X2.shape) # Debugging of custom layer return X1 + X2, X1 * X2 def compute_output_shape(self, batch_input_shape): batch_input_shape1, batch_input_shape2 = batch_input_shape return [batch_input_shape1, batch_input_shape2] # Our custom layer can be called using the functional API like this: # In[145]: inputs1 = keras.layers.Input(shape=[2]) inputs2 = keras.layers.Input(shape=[2]) outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2)) # Note that the `call()` method receives symbolic inputs, whose shape is only partially specified (at this stage, we don't know the batch size, which is why the first dimension is `None`): # # We can also pass actual data to the custom layer. To test this, let's split each dataset's inputs into two parts, with four features each: # In[146]: def split_data(data): columns_count = data.shape[-1] half = columns_count // 2 return data[:, :half], data[:, half:] X_train_scaled_A, X_train_scaled_B = split_data(X_train_scaled) X_valid_scaled_A, X_valid_scaled_B = split_data(X_valid_scaled) X_test_scaled_A, X_test_scaled_B = split_data(X_test_scaled) # Printing the splitted data shapes X_train_scaled_A.shape, X_train_scaled_B.shape # Now notice that the shapes are fully specified: # In[147]: outputs1, outputs2 = MyMultiLayer()((X_train_scaled_A, X_train_scaled_B)) # Let's build a more complete model using the functional API (this is just a toy example, don't expect awesome performance): # In[148]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) input_A = keras.layers.Input(shape=X_train_scaled_A.shape[-1]) input_B = keras.layers.Input(shape=X_train_scaled_B.shape[-1]) hidden_A, hidden_B = MyMultiLayer()((input_A, input_B)) hidden_A = keras.layers.Dense(30, activation='selu')(hidden_A) hidden_B = keras.layers.Dense(30, activation='selu')(hidden_B) concat = keras.layers.Concatenate()((hidden_A, hidden_B)) output = keras.layers.Dense(1)(concat) model = keras.models.Model(inputs=[input_A, input_B], outputs=[output]) # In[149]: model.compile(loss='mse', optimizer='nadam') # In[150]: model.fit((X_train_scaled_A, X_train_scaled_B), y_train, epochs=2, validation_data=((X_valid_scaled_A, X_valid_scaled_B), y_valid)) # Now let's create a layer with a different behavior during training and testing: # In[151]: class AddGaussianNoise(keras.layers.Layer): def __init__(self, stddev, **kwargs): super().__init__(**kwargs) self.stddev = stddev def call(self, X, training=None): if training: noise = tf.random.normal(tf.shape(X), stddev=self.stddev) return X + noise else: return X def compute_output_shape(self, batch_input_shape): return batch_input_shape # Here's a simple model that uses this custom layer: # In[152]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) model = keras.models.Sequential([ AddGaussianNoise(stddev=1.0), keras.layers.Dense(30, activation="selu"), keras.layers.Dense(1) ]) # In[153]: model.compile(loss="mse", optimizer="nadam") model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # ## Custom Models # In[154]: X_new_scaled = X_test_scaled # In[155]: class ResidualBlock(keras.layers.Layer): def __init__(self, n_layers, n_neurons, **kwargs): super().__init__(**kwargs) self.hidden = [keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal") for _ in range(n_layers)] def call(self, inputs): Z = inputs for layer in self.hidden: Z = layer(Z) return inputs + Z # In[156]: class ResidualRegressor(keras.models.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal") self.block1 = ResidualBlock(2, 30) self.block2 = ResidualBlock(2, 30) self.out = keras.layers.Dense(output_dim) def call(self, inputs): Z = self.hidden1(inputs) for _ in range(1 + 3): Z = self.block1(Z) Z = self.block2(Z) return self.out(Z) # In[157]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[158]: model = ResidualRegressor(1) model.compile(loss="mse", optimizer="nadam") history = model.fit(X_train_scaled, y_train, epochs=5) score = model.evaluate(X_test_scaled, y_test) y_pred = model.predict(X_new_scaled) # In[159]: model.save("my_custom_model.ckpt") # In[160]: model = keras.models.load_model("my_custom_model.ckpt") # In[161]: history = model.fit(X_train_scaled, y_train, epochs=5) # We could have defined the model using the sequential API instead: # In[162]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[163]: block1 = ResidualBlock(2, 30) model = keras.models.Sequential([ keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal"), block1, block1, block1, block1, ResidualBlock(2, 30), keras.layers.Dense(1) ]) # In[164]: model.compile(loss="mse", optimizer="nadam") history = model.fit(X_train_scaled, y_train, epochs=5) score = model.evaluate(X_test_scaled, y_test) y_pred = model.predict(X_new_scaled) # ## Losses and Metrics Based on Model Internals # **Note**: the following code has two differences with the code in the book: # 1. It creates a `keras.metrics.Mean()` metric in the constructor and uses it in the `call()` method to track the mean reconstruction loss. Since we only want to do this during training, we add a `training` argument to the `call()` method, and if `training` is `True`, then we update `reconstruction_mean` and we call `self.add_metric()` to ensure it's displayed properly. # 2. Due to an issue introduced in TF 2.2 ([#46858](https://github.com/tensorflow/tensorflow/issues/46858)), we must not call `super().build()` inside the `build()` method. # In[165]: class ReconstructingRegressor(keras.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal") for _ in range(5)] self.out = keras.layers.Dense(output_dim) self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error") def build(self, batch_input_shape): n_inputs = batch_input_shape[-1] self.reconstruct = keras.layers.Dense(n_inputs) #super().build(batch_input_shape) def call(self, inputs, training=None): Z = inputs for layer in self.hidden: Z = layer(Z) reconstruction = self.reconstruct(Z) recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs)) self.add_loss(0.05 * recon_loss) if training: result = self.reconstruction_mean(recon_loss) self.add_metric(result) return self.out(Z) # In[166]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[167]: model = ReconstructingRegressor(1) model.compile(loss="mse", optimizer="nadam") history = model.fit(X_train_scaled, y_train, epochs=2) y_pred = model.predict(X_test_scaled) # ## Computing Gradients with Autodiff # In[168]: def f(w1, w2): return 3 * w1 ** 2 + 2 * w1 * w2 # In[169]: w1, w2 = 5, 3 eps = 1e-6 (f(w1 + eps, w2) - f(w1, w2)) / eps # In[170]: (f(w1, w2 + eps) - f(w1, w2)) / eps # In[171]: w1, w2 = tf.Variable(5.), tf.Variable(3.) with tf.GradientTape() as tape: z = f(w1, w2) gradients = tape.gradient(z, [w1, w2]) # In[172]: gradients # In[173]: with tf.GradientTape() as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) try: dz_dw2 = tape.gradient(z, w2) except RuntimeError as ex: print(ex) # In[174]: with tf.GradientTape(persistent=True) as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) dz_dw2 = tape.gradient(z, w2) # works now! del tape # In[175]: dz_dw1, dz_dw2 # In[176]: c1, c2 = tf.constant(5.), tf.constant(3.) with tf.GradientTape() as tape: z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # In[177]: gradients # In[178]: with tf.GradientTape() as tape: tape.watch(c1) tape.watch(c2) z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # In[179]: gradients # In[180]: with tf.GradientTape() as tape: z1 = f(w1, w2 + 2.) z2 = f(w1, w2 + 5.) z3 = f(w1, w2 + 7.) tape.gradient([z1, z2, z3], [w1, w2]) # In[181]: with tf.GradientTape(persistent=True) as tape: z1 = f(w1, w2 + 2.) z2 = f(w1, w2 + 5.) z3 = f(w1, w2 + 7.) tf.reduce_sum(tf.stack([tape.gradient(z, [w1, w2]) for z in (z1, z2, z3)]), axis=0) del tape # In[182]: with tf.GradientTape(persistent=True) as hessian_tape: with tf.GradientTape() as jacobian_tape: z = f(w1, w2) jacobians = jacobian_tape.gradient(z, [w1, w2]) hessians = [hessian_tape.gradient(jacobian, [w1, w2]) for jacobian in jacobians] del hessian_tape # In[183]: jacobians # In[184]: hessians # In[185]: def f(w1, w2): return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2) with tf.GradientTape() as tape: z = f(w1, w2) tape.gradient(z, [w1, w2]) # In[186]: x = tf.Variable(100.) with tf.GradientTape() as tape: z = my_softplus(x) tape.gradient(z, [x]) # In[187]: tf.math.log(tf.exp(tf.constant(30., dtype=tf.float32)) + 1.) # In[188]: x = tf.Variable([100.]) with tf.GradientTape() as tape: z = my_softplus(x) tape.gradient(z, [x]) # In[189]: @tf.custom_gradient def my_better_softplus(z): exp = tf.exp(z) def my_softplus_gradients(grad): return grad / (1 + 1 / exp) return tf.math.log(exp + 1), my_softplus_gradients # In[190]: def my_better_softplus(z): return tf.where(z > 30., z, tf.math.log(tf.exp(z) + 1.)) # In[191]: x = tf.Variable([1000.]) with tf.GradientTape() as tape: z = my_better_softplus(x) z, tape.gradient(z, [x]) # # Custom Training Loops # In[192]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[193]: l2_reg = keras.regularizers.l2(0.05) model = keras.models.Sequential([ keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg), keras.layers.Dense(1, kernel_regularizer=l2_reg) ]) # In[194]: def random_batch(X, y, batch_size=32): idx = np.random.randint(len(X), size=batch_size) return X[idx], y[idx] # In[195]: def print_status_bar(iteration, total, loss, metrics=None): metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])]) end = "" if iteration < total else "\n" print("\r{}/{} - ".format(iteration, total) + metrics, end=end) # In[196]: import time mean_loss = keras.metrics.Mean(name="loss") mean_square = keras.metrics.Mean(name="mean_square") for i in range(1, 50 + 1): loss = 1 / i mean_loss(loss) mean_square(i ** 2) print_status_bar(i, 50, mean_loss, [mean_square]) time.sleep(0.05) # A fancier version with a progress bar: # In[197]: def progress_bar(iteration, total, size=30): running = iteration < total c = ">" if running else "=" p = (size - 1) * iteration // total fmt = "{{:-{}d}}/{{}} [{{}}]".format(len(str(total))) params = [iteration, total, "=" * p + c + "." * (size - p - 1)] return fmt.format(*params) # In[198]: progress_bar(3500, 10000, size=6) # In[199]: def print_status_bar(iteration, total, loss, metrics=None, size=30): metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])]) end = "" if iteration < total else "\n" print("\r{} - {}".format(progress_bar(iteration, total), metrics), end=end) # In[200]: mean_loss = keras.metrics.Mean(name="loss") mean_square = keras.metrics.Mean(name="mean_square") for i in range(1, 50 + 1): loss = 1 / i mean_loss(loss) mean_square(i ** 2) print_status_bar(i, 50, mean_loss, [mean_square]) time.sleep(0.05) # In[201]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[202]: n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size optimizer = keras.optimizers.Nadam(learning_rate=0.01) loss_fn = keras.losses.mean_squared_error mean_loss = keras.metrics.Mean() metrics = [keras.metrics.MeanAbsoluteError()] # In[203]: for epoch in range(1, n_epochs + 1): print("Epoch {}/{}".format(epoch, n_epochs)) for step in range(1, n_steps + 1): X_batch, y_batch = random_batch(X_train_scaled, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) mean_loss(loss) for metric in metrics: metric(y_batch, y_pred) print_status_bar(step * batch_size, len(y_train), mean_loss, metrics) print_status_bar(len(y_train), len(y_train), mean_loss, metrics) for metric in [mean_loss] + metrics: metric.reset_states() # In[204]: try: from tqdm.notebook import trange from collections import OrderedDict with trange(1, n_epochs + 1, desc="All epochs") as epochs: for epoch in epochs: with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps: for step in steps: X_batch, y_batch = random_batch(X_train_scaled, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) status = OrderedDict() mean_loss(loss) status["loss"] = mean_loss.result().numpy() for metric in metrics: metric(y_batch, y_pred) status[metric.name] = metric.result().numpy() steps.set_postfix(status) for metric in [mean_loss] + metrics: metric.reset_states() except ImportError as ex: print("To run this cell, please install tqdm, ipywidgets and restart Jupyter") # ## TensorFlow Functions # In[205]: def cube(x): return x ** 3 # In[206]: cube(2) # In[207]: cube(tf.constant(2.0)) # In[208]: tf_cube = tf.function(cube) tf_cube # In[209]: tf_cube(2) # In[210]: tf_cube(tf.constant(2.0)) # ### TF Functions and Concrete Functions # In[211]: concrete_function = tf_cube.get_concrete_function(tf.constant(2.0)) concrete_function.graph # In[212]: concrete_function(tf.constant(2.0)) # In[213]: concrete_function is tf_cube.get_concrete_function(tf.constant(2.0)) # ### Exploring Function Definitions and Graphs # In[214]: concrete_function.graph # In[215]: ops = concrete_function.graph.get_operations() ops # In[216]: pow_op = ops[2] list(pow_op.inputs) # In[217]: pow_op.outputs # In[218]: concrete_function.graph.get_operation_by_name('x') # In[219]: concrete_function.graph.get_tensor_by_name('Identity:0') # In[220]: concrete_function.function_def.signature # ### How TF Functions Trace Python Functions to Extract Their Computation Graphs # In[221]: @tf.function def tf_cube(x): print("print:", x) return x ** 3 # In[222]: result = tf_cube(tf.constant(2.0)) # In[223]: result # In[224]: result = tf_cube(2) result = tf_cube(3) result = tf_cube(tf.constant([[1., 2.]])) # New shape: trace! result = tf_cube(tf.constant([[3., 4.], [5., 6.]])) # New shape: trace! result = tf_cube(tf.constant([[7., 8.], [9., 10.], [11., 12.]])) # New shape: trace! # It is also possible to specify a particular input signature: # In[225]: @tf.function(input_signature=[tf.TensorSpec([None, 28, 28], tf.float32)]) def shrink(images): print("Tracing", images) return images[:, ::2, ::2] # drop half the rows and columns # In[226]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[227]: img_batch_1 = tf.random.uniform(shape=[100, 28, 28]) img_batch_2 = tf.random.uniform(shape=[50, 28, 28]) preprocessed_images = shrink(img_batch_1) # Traces the function. preprocessed_images = shrink(img_batch_2) # Reuses the same concrete function. # In[228]: img_batch_3 = tf.random.uniform(shape=[2, 2, 2]) try: preprocessed_images = shrink(img_batch_3) # rejects unexpected types or shapes except ValueError as ex: print(ex) # ### Using Autograph To Capture Control Flow # A "static" `for` loop using `range()`: # In[229]: @tf.function def add_10(x): for i in range(10): x += 1 return x # In[230]: add_10(tf.constant(5)) # In[231]: add_10.get_concrete_function(tf.constant(5)).graph.get_operations() # A "dynamic" loop using `tf.while_loop()`: # In[232]: @tf.function def add_10(x): condition = lambda i, x: tf.less(i, 10) body = lambda i, x: (tf.add(i, 1), tf.add(x, 1)) final_i, final_x = tf.while_loop(condition, body, [tf.constant(0), x]) return final_x # In[233]: add_10(tf.constant(5)) # In[234]: add_10.get_concrete_function(tf.constant(5)).graph.get_operations() # A "dynamic" `for` loop using `tf.range()` (captured by autograph): # In[235]: @tf.function def add_10(x): for i in tf.range(10): x = x + 1 return x # In[236]: add_10.get_concrete_function(tf.constant(0)).graph.get_operations() # ### Handling Variables and Other Resources in TF Functions # In[237]: counter = tf.Variable(0) @tf.function def increment(counter, c=1): return counter.assign_add(c) # In[238]: increment(counter) increment(counter) # In[239]: function_def = increment.get_concrete_function(counter).function_def function_def.signature.input_arg[0] # In[240]: counter = tf.Variable(0) @tf.function def increment(c=1): return counter.assign_add(c) # In[241]: increment() increment() # In[242]: function_def = increment.get_concrete_function().function_def function_def.signature.input_arg[0] # In[243]: class Counter: def __init__(self): self.counter = tf.Variable(0) @tf.function def increment(self, c=1): return self.counter.assign_add(c) # In[244]: c = Counter() c.increment() c.increment() # In[245]: @tf.function def add_10(x): for i in tf.range(10): x += 1 return x print(tf.autograph.to_code(add_10.python_function)) # In[246]: def display_tf_code(func): from IPython.display import display, Markdown if hasattr(func, "python_function"): func = func.python_function code = tf.autograph.to_code(func) display(Markdown('```python\n{}\n```'.format(code))) # In[247]: display_tf_code(add_10) # ## Using TF Functions with tf.keras (or Not) # By default, tf.keras will automatically convert your custom code into TF Functions, no need to use # `tf.function()`: # In[248]: # Custom loss function def my_mse(y_true, y_pred): print("Tracing loss my_mse()") return tf.reduce_mean(tf.square(y_pred - y_true)) # In[249]: # Custom metric function def my_mae(y_true, y_pred): print("Tracing metric my_mae()") return tf.reduce_mean(tf.abs(y_pred - y_true)) # In[250]: # Custom layer class MyDense(keras.layers.Layer): def __init__(self, units, activation=None, **kwargs): super().__init__(**kwargs) self.units = units self.activation = keras.activations.get(activation) def build(self, input_shape): self.kernel = self.add_weight(name='kernel', shape=(input_shape[1], self.units), initializer='uniform', trainable=True) self.biases = self.add_weight(name='bias', shape=(self.units,), initializer='zeros', trainable=True) super().build(input_shape) def call(self, X): print("Tracing MyDense.call()") return self.activation(X @ self.kernel + self.biases) # In[251]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[252]: # Custom model class MyModel(keras.models.Model): def __init__(self, **kwargs): super().__init__(**kwargs) self.hidden1 = MyDense(30, activation="relu") self.hidden2 = MyDense(30, activation="relu") self.output_ = MyDense(1) def call(self, input): print("Tracing MyModel.call()") hidden1 = self.hidden1(input) hidden2 = self.hidden2(hidden1) concat = keras.layers.concatenate([input, hidden2]) output = self.output_(concat) return output model = MyModel() # In[253]: model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae]) # In[254]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # You can turn this off by creating the model with `dynamic=True` (or calling `super().__init__(dynamic=True, **kwargs)` in the model's constructor): # In[255]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[256]: model = MyModel(dynamic=True) # In[257]: model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae]) # Not the custom code will be called at each iteration. Let's fit, validate and evaluate with tiny datasets to avoid getting too much output: # In[258]: model.fit(X_train_scaled[:64], y_train[:64], epochs=1, validation_data=(X_valid_scaled[:64], y_valid[:64]), verbose=0) model.evaluate(X_test_scaled[:64], y_test[:64], verbose=0) # Alternatively, you can compile a model with `run_eagerly=True`: # In[259]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[260]: model = MyModel() # In[261]: model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae], run_eagerly=True) # In[262]: model.fit(X_train_scaled[:64], y_train[:64], epochs=1, validation_data=(X_valid_scaled[:64], y_valid[:64]), verbose=0) model.evaluate(X_test_scaled[:64], y_test[:64], verbose=0) # ## Custom Optimizers # Defining custom optimizers is not very common, but in case you are one of the happy few who gets to write one, here is an example: # In[263]: class MyMomentumOptimizer(keras.optimizers.Optimizer): def __init__(self, learning_rate=0.001, momentum=0.9, name="MyMomentumOptimizer", **kwargs): """Call super().__init__() and use _set_hyper() to store hyperparameters""" super().__init__(name, **kwargs) self._set_hyper("learning_rate", kwargs.get("lr", learning_rate)) # handle lr=learning_rate self._set_hyper("decay", self._initial_decay) # self._set_hyper("momentum", momentum) def _create_slots(self, var_list): """For each model variable, create the optimizer variable associated with it. TensorFlow calls these optimizer variables "slots". For momentum optimization, we need one momentum slot per model variable. """ for var in var_list: self.add_slot(var, "momentum") @tf.function def _resource_apply_dense(self, grad, var): """Update the slots and perform one optimization step for one model variable """ var_dtype = var.dtype.base_dtype lr_t = self._decayed_lr(var_dtype) # handle learning rate decay momentum_var = self.get_slot(var, "momentum") momentum_hyper = self._get_hyper("momentum", var_dtype) momentum_var.assign(momentum_var * momentum_hyper - (1. - momentum_hyper)* grad) var.assign_add(momentum_var * lr_t) def _resource_apply_sparse(self, grad, var): raise NotImplementedError def get_config(self): base_config = super().get_config() return { **base_config, "learning_rate": self._serialize_hyperparameter("learning_rate"), "decay": self._serialize_hyperparameter("decay"), "momentum": self._serialize_hyperparameter("momentum"), } # In[264]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[265]: model = keras.models.Sequential([keras.layers.Dense(1, input_shape=[8])]) model.compile(loss="mse", optimizer=MyMomentumOptimizer()) model.fit(X_train_scaled, y_train, epochs=5) # # Exercises # ## 1. to 11. # See Appendix A. # ## 12. Implement a custom layer that performs _Layer Normalization_ # _We will use this type of layer in Chapter 15 when using Recurrent Neural Networks._ # ### a. # _Exercise: The `build()` method should define two trainable weights *α* and *β*, both of shape `input_shape[-1:]` and data type `tf.float32`. *α* should be initialized with 1s, and *β* with 0s._ # Solution: see below. # ### b. # _Exercise: The `call()` method should compute the mean_ μ _and standard deviation_ σ _of each instance's features. For this, you can use `tf.nn.moments(inputs, axes=-1, keepdims=True)`, which returns the mean μ and the variance σ2 of all instances (compute the square root of the variance to get the standard deviation). Then the function should compute and return *α*⊗(*X* - μ)/(σ + ε) + *β*, where ⊗ represents itemwise multiplication (`*`) and ε is a smoothing term (small constant to avoid division by zero, e.g., 0.001)._ # In[266]: class LayerNormalization(keras.layers.Layer): def __init__(self, eps=0.001, **kwargs): super().__init__(**kwargs) self.eps = eps def build(self, batch_input_shape): self.alpha = self.add_weight( name="alpha", shape=batch_input_shape[-1:], initializer="ones") self.beta = self.add_weight( name="beta", shape=batch_input_shape[-1:], initializer="zeros") super().build(batch_input_shape) # must be at the end def call(self, X): mean, variance = tf.nn.moments(X, axes=-1, keepdims=True) return self.alpha * (X - mean) / (tf.sqrt(variance + self.eps)) + self.beta def compute_output_shape(self, batch_input_shape): return batch_input_shape def get_config(self): base_config = super().get_config() return {**base_config, "eps": self.eps} # Note that making _ε_ a hyperparameter (`eps`) was not compulsory. Also note that it's preferable to compute `tf.sqrt(variance + self.eps)` rather than `tf.sqrt(variance) + self.eps`. Indeed, the derivative of sqrt(z) is undefined when z=0, so training will bomb whenever the variance vector has at least one component equal to 0. Adding _ε_ within the square root guarantees that this will never happen. # ### c. # _Exercise: Ensure that your custom layer produces the same (or very nearly the same) output as the `keras.layers.LayerNormalization` layer._ # Let's create one instance of each class, apply them to some data (e.g., the training set), and ensure that the difference is negligeable. # In[267]: X = X_train.astype(np.float32) custom_layer_norm = LayerNormalization() keras_layer_norm = keras.layers.LayerNormalization() tf.reduce_mean(keras.losses.mean_absolute_error( keras_layer_norm(X), custom_layer_norm(X))) # Yep, that's close enough. To be extra sure, let's make alpha and beta completely random and compare again: # In[268]: random_alpha = np.random.rand(X.shape[-1]) random_beta = np.random.rand(X.shape[-1]) custom_layer_norm.set_weights([random_alpha, random_beta]) keras_layer_norm.set_weights([random_alpha, random_beta]) tf.reduce_mean(keras.losses.mean_absolute_error( keras_layer_norm(X), custom_layer_norm(X))) # Still a negligeable difference! Our custom layer works fine. # ## 13. Train a model using a custom training loop to tackle the Fashion MNIST dataset # _The Fashion MNIST dataset was introduced in Chapter 10._ # ### a. # _Exercise: Display the epoch, iteration, mean training loss, and mean accuracy over each epoch (updated at each iteration), as well as the validation loss and accuracy at the end of each epoch._ # In[269]: (X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data() X_train_full = X_train_full.astype(np.float32) / 255. X_valid, X_train = X_train_full[:5000], X_train_full[5000:] y_valid, y_train = y_train_full[:5000], y_train_full[5000:] X_test = X_test.astype(np.float32) / 255. # In[270]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[271]: model = keras.models.Sequential([ keras.layers.Flatten(input_shape=[28, 28]), keras.layers.Dense(100, activation="relu"), keras.layers.Dense(10, activation="softmax"), ]) # In[272]: n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size optimizer = keras.optimizers.Nadam(learning_rate=0.01) loss_fn = keras.losses.sparse_categorical_crossentropy mean_loss = keras.metrics.Mean() metrics = [keras.metrics.SparseCategoricalAccuracy()] # In[273]: with trange(1, n_epochs + 1, desc="All epochs") as epochs: for epoch in epochs: with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps: for step in steps: X_batch, y_batch = random_batch(X_train, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) status = OrderedDict() mean_loss(loss) status["loss"] = mean_loss.result().numpy() for metric in metrics: metric(y_batch, y_pred) status[metric.name] = metric.result().numpy() steps.set_postfix(status) y_pred = model(X_valid) status["val_loss"] = np.mean(loss_fn(y_valid, y_pred)) status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy( tf.constant(y_valid, dtype=np.float32), y_pred)) steps.set_postfix(status) for metric in [mean_loss] + metrics: metric.reset_states() # ### b. # _Exercise: Try using a different optimizer with a different learning rate for the upper layers and the lower layers._ # In[274]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[275]: lower_layers = keras.models.Sequential([ keras.layers.Flatten(input_shape=[28, 28]), keras.layers.Dense(100, activation="relu"), ]) upper_layers = keras.models.Sequential([ keras.layers.Dense(10, activation="softmax"), ]) model = keras.models.Sequential([ lower_layers, upper_layers ]) # In[276]: lower_optimizer = keras.optimizers.SGD(learning_rate=1e-4) upper_optimizer = keras.optimizers.Nadam(learning_rate=1e-3) # In[277]: n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size loss_fn = keras.losses.sparse_categorical_crossentropy mean_loss = keras.metrics.Mean() metrics = [keras.metrics.SparseCategoricalAccuracy()] # In[278]: with trange(1, n_epochs + 1, desc="All epochs") as epochs: for epoch in epochs: with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps: for step in steps: X_batch, y_batch = random_batch(X_train, y_train) with tf.GradientTape(persistent=True) as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) for layers, optimizer in ((lower_layers, lower_optimizer), (upper_layers, upper_optimizer)): gradients = tape.gradient(loss, layers.trainable_variables) optimizer.apply_gradients(zip(gradients, layers.trainable_variables)) del tape for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) status = OrderedDict() mean_loss(loss) status["loss"] = mean_loss.result().numpy() for metric in metrics: metric(y_batch, y_pred) status[metric.name] = metric.result().numpy() steps.set_postfix(status) y_pred = model(X_valid) status["val_loss"] = np.mean(loss_fn(y_valid, y_pred)) status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy( tf.constant(y_valid, dtype=np.float32), y_pred)) steps.set_postfix(status) for metric in [mean_loss] + metrics: metric.reset_states() # In[ ]: