#!/usr/bin/env python # coding: utf-8 # **12장 – 텐서플로를 사용한 사용자 정의 모델과 훈련** # _이 노트북은 12장에 있는 모든 샘플 코드와 연습문제 해답을 가지고 있습니다._ # # #
# 구글 코랩에서 실행하기 #
# # 설정 # 먼저 몇 개의 모듈을 임포트합니다. 맷플롯립 그래프를 인라인으로 출력하도록 만들고 그림을 저장하는 함수를 준비합니다. 또한 파이썬 버전이 3.5 이상인지 확인합니다(파이썬 2.x에서도 동작하지만 곧 지원이 중단되므로 파이썬 3을 사용하는 것이 좋습니다). 사이킷런 버전이 0.20 이상인지와 텐서플로 버전이 2.0 이상인지 확인합니다. # In[1]: # 파이썬 ≥3.5 필수 import sys assert sys.version_info >= (3, 5) # 사이킷런 ≥0.20 필수 import sklearn assert sklearn.__version__ >= "0.20" try: # %tensorflow_version은 코랩 명령입니다. get_ipython().run_line_magic('tensorflow_version', '2.x') except Exception: pass # 이 노트북은 텐서플로 ≥2.4이 필요합니다 # 2.x 버전은 대부분 동일한 결과를 만들지만 몇 가지 버그가 있습니다. import tensorflow as tf from tensorflow import keras assert tf.__version__ >= "2.4" # 공통 모듈 임포트 import numpy as np import os # 노트북 실행 결과를 동일하게 유지하기 위해 np.random.seed(42) tf.random.set_seed(42) # 깔끔한 그래프 출력을 위해 get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib as mpl import matplotlib.pyplot as plt mpl.rc('axes', labelsize=14) mpl.rc('xtick', labelsize=12) mpl.rc('ytick', labelsize=12) # 그림을 저장할 위치 PROJECT_ROOT_DIR = "." CHAPTER_ID = "deep" IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID) os.makedirs(IMAGES_PATH, exist_ok=True) def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300): path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension) print("그림 저장:", fig_id) if tight_layout: plt.tight_layout() plt.savefig(path, format=fig_extension, dpi=resolution) # ## 텐서와 연산 # ### 텐서 # In[2]: tf.constant([[1., 2., 3.], [4., 5., 6.]]) # 행렬 # In[3]: tf.constant(42) # 스칼라 # In[4]: t = tf.constant([[1., 2., 3.], [4., 5., 6.]]) t # In[5]: t.shape # In[6]: t.dtype # ### 인덱싱 # In[7]: t[:, 1:] # In[8]: t[..., 1, tf.newaxis] # ### 연산 # In[9]: t + 10 # In[10]: tf.square(t) # In[11]: t @ tf.transpose(t) # ### `keras.backend` 사용하기 # In[12]: from tensorflow import keras K = keras.backend K.square(K.transpose(t)) + 10 # ### 넘파이 변환 # In[13]: a = np.array([2., 4., 5.]) tf.constant(a) # In[14]: t.numpy() # In[15]: np.array(t) # In[16]: tf.square(a) # In[17]: np.square(t) # ### 타입 변환 # In[18]: try: tf.constant(2.0) + tf.constant(40) except tf.errors.InvalidArgumentError as ex: print(ex) # In[19]: try: tf.constant(2.0) + tf.constant(40., dtype=tf.float64) except tf.errors.InvalidArgumentError as ex: print(ex) # In[20]: t2 = tf.constant(40., dtype=tf.float64) tf.constant(2.0) + tf.cast(t2, tf.float32) # ### 문자열 # In[21]: tf.constant(b"hello world") # In[22]: tf.constant("café") # In[23]: u = tf.constant([ord(c) for c in "café"]) u # In[24]: b = tf.strings.unicode_encode(u, "UTF-8") tf.strings.length(b, unit="UTF8_CHAR") # In[25]: tf.strings.unicode_decode(b, "UTF-8") # ### 문자열 배열 # In[26]: p = tf.constant(["Café", "Coffee", "caffè", "咖啡"]) # In[27]: tf.strings.length(p, unit="UTF8_CHAR") # In[28]: r = tf.strings.unicode_decode(p, "UTF8") r # In[29]: print(r) # ### 래그드 텐서 # In[30]: print(r[1]) # In[31]: print(r[1:3]) # In[32]: r2 = tf.ragged.constant([[65, 66], [], [67]]) print(tf.concat([r, r2], axis=0)) # In[33]: r3 = tf.ragged.constant([[68, 69, 70], [71], [], [72, 73]]) print(tf.concat([r, r3], axis=1)) # In[34]: tf.strings.unicode_encode(r3, "UTF-8") # In[35]: r.to_tensor() # ### 희소 텐서 # In[36]: s = tf.SparseTensor(indices=[[0, 1], [1, 0], [2, 3]], values=[1., 2., 3.], dense_shape=[3, 4]) # In[37]: print(s) # In[38]: tf.sparse.to_dense(s) # In[39]: s2 = s * 2.0 # In[40]: try: s3 = s + 1. except TypeError as ex: print(ex) # In[41]: s4 = tf.constant([[10., 20.], [30., 40.], [50., 60.], [70., 80.]]) tf.sparse.sparse_dense_matmul(s, s4) # In[42]: s5 = tf.SparseTensor(indices=[[0, 2], [0, 1]], values=[1., 2.], dense_shape=[3, 4]) print(s5) # In[43]: try: tf.sparse.to_dense(s5) except tf.errors.InvalidArgumentError as ex: print(ex) # In[44]: s6 = tf.sparse.reorder(s5) tf.sparse.to_dense(s6) # ### 집합 # In[45]: set1 = tf.constant([[2, 3, 5, 7], [7, 9, 0, 0]]) set2 = tf.constant([[4, 5, 6], [9, 10, 0]]) tf.sparse.to_dense(tf.sets.union(set1, set2)) # In[46]: tf.sparse.to_dense(tf.sets.difference(set1, set2)) # In[47]: tf.sparse.to_dense(tf.sets.intersection(set1, set2)) # ### 변수 # In[48]: v = tf.Variable([[1., 2., 3.], [4., 5., 6.]]) # In[49]: v.assign(2 * v) # In[50]: v[0, 1].assign(42) # In[51]: v[:, 2].assign([0., 1.]) # In[52]: try: v[1] = [7., 8., 9.] except TypeError as ex: print(ex) # In[53]: v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.]) # In[54]: sparse_delta = tf.IndexedSlices(values=[[1., 2., 3.], [4., 5., 6.]], indices=[1, 0]) v.scatter_update(sparse_delta) # ### 텐서 배열 # In[55]: array = tf.TensorArray(dtype=tf.float32, size=3) array = array.write(0, tf.constant([1., 2.])) array = array.write(1, tf.constant([3., 10.])) array = array.write(2, tf.constant([5., 7.])) # In[56]: array.read(1) # In[57]: array.stack() # In[58]: mean, variance = tf.nn.moments(array.stack(), axes=0) mean # In[59]: variance # ## 사용자 정의 손실 함수 # 캘리포니아 주택 데이터셋을 로드하여 준비해 보겠습니다. 먼저 이 데이터셋을 로드한 다음 훈련 세트, 검증 세트, 테스트 세트로 나눕니다. 마지막으로 스케일을 변경합니다: # In[60]: from sklearn.datasets import fetch_california_housing from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler housing = fetch_california_housing() X_train_full, X_test, y_train_full, y_test = train_test_split( housing.data, housing.target.reshape(-1, 1), random_state=42) X_train, X_valid, y_train, y_valid = train_test_split( X_train_full, y_train_full, random_state=42) scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_valid_scaled = scaler.transform(X_valid) X_test_scaled = scaler.transform(X_test) # In[61]: def huber_fn(y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < 1 squared_loss = tf.square(error) / 2 linear_loss = tf.abs(error) - 0.5 return tf.where(is_small_error, squared_loss, linear_loss) # In[62]: plt.figure(figsize=(8, 3.5)) z = np.linspace(-4, 4, 200) plt.plot(z, huber_fn(0, z), "b-", linewidth=2, label="huber($z$)") plt.plot(z, z**2 / 2, "b:", linewidth=1, label=r"$\frac{1}{2}z^2$") plt.plot([-1, -1], [0, huber_fn(0., -1.)], "r--") plt.plot([1, 1], [0, huber_fn(0., 1.)], "r--") plt.gca().axhline(y=0, color='k') plt.gca().axvline(x=0, color='k') plt.axis([-4, 4, 0, 4]) plt.grid(True) plt.xlabel("$z$") plt.legend(fontsize=14) plt.title("Huber loss", fontsize=14) plt.show() # In[63]: input_shape = X_train.shape[1:] model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[64]: model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"]) # In[65]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # ## 사용자 정의 요소를 가진 모델을 저장하고 로드하기 # In[66]: model.save("my_model_with_a_custom_loss.h5") # In[67]: model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": huber_fn}) # In[68]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[69]: def create_huber(threshold=1.0): def huber_fn(y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < threshold squared_loss = tf.square(error) / 2 linear_loss = threshold * tf.abs(error) - threshold**2 / 2 return tf.where(is_small_error, squared_loss, linear_loss) return huber_fn # In[70]: model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"]) # In[71]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[72]: model.save("my_model_with_a_custom_loss_threshold_2.h5") # In[73]: model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5", custom_objects={"huber_fn": create_huber(2.0)}) # In[74]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[75]: class HuberLoss(keras.losses.Loss): def __init__(self, threshold=1.0, **kwargs): self.threshold = threshold super().__init__(**kwargs) def call(self, y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < self.threshold squared_loss = tf.square(error) / 2 linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2 return tf.where(is_small_error, squared_loss, linear_loss) def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} # In[76]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[77]: model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"]) # In[78]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[79]: model.save("my_model_with_a_custom_loss_class.h5") # In[80]: model = keras.models.load_model("my_model_with_a_custom_loss_class.h5", custom_objects={"HuberLoss": HuberLoss}) # In[81]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[82]: model.loss.threshold # ## 그외 사용자 정의 함수 # In[83]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[84]: def my_softplus(z): # tf.nn.softplus(z) 값을 반환합니다 return tf.math.log(tf.exp(z) + 1.0) def my_glorot_initializer(shape, dtype=tf.float32): stddev = tf.sqrt(2. / (shape[0] + shape[1])) return tf.random.normal(shape, stddev=stddev, dtype=dtype) def my_l1_regularizer(weights): return tf.reduce_sum(tf.abs(0.01 * weights)) def my_positive_weights(weights): # tf.nn.relu(weights) 값을 반환합니다 return tf.where(weights < 0., tf.zeros_like(weights), weights) # In[85]: layer = keras.layers.Dense(1, activation=my_softplus, kernel_initializer=my_glorot_initializer, kernel_regularizer=my_l1_regularizer, kernel_constraint=my_positive_weights) # In[86]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[87]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1, activation=my_softplus, kernel_regularizer=my_l1_regularizer, kernel_constraint=my_positive_weights, kernel_initializer=my_glorot_initializer), ]) # In[88]: model.compile(loss="mse", optimizer="nadam", metrics=["mae"]) # In[89]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[90]: model.save("my_model_with_many_custom_parts.h5") # In[91]: model = keras.models.load_model( "my_model_with_many_custom_parts.h5", custom_objects={ "my_l1_regularizer": my_l1_regularizer, "my_positive_weights": my_positive_weights, "my_glorot_initializer": my_glorot_initializer, "my_softplus": my_softplus, }) # In[92]: class MyL1Regularizer(keras.regularizers.Regularizer): def __init__(self, factor): self.factor = factor def __call__(self, weights): return tf.reduce_sum(tf.abs(self.factor * weights)) def get_config(self): return {"factor": self.factor} # In[93]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[94]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1, activation=my_softplus, kernel_regularizer=MyL1Regularizer(0.01), kernel_constraint=my_positive_weights, kernel_initializer=my_glorot_initializer), ]) # In[95]: model.compile(loss="mse", optimizer="nadam", metrics=["mae"]) # In[96]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) # In[97]: model.save("my_model_with_many_custom_parts.h5") # In[98]: model = keras.models.load_model( "my_model_with_many_custom_parts.h5", custom_objects={ "MyL1Regularizer": MyL1Regularizer, "my_positive_weights": my_positive_weights, "my_glorot_initializer": my_glorot_initializer, "my_softplus": my_softplus, }) # ## 사용자 정의 지표 # In[99]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[100]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[101]: model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)]) # In[102]: model.fit(X_train_scaled, y_train, epochs=2) # **노트**: 손실과 지표에 같은 함수를 사용하면 다른 결과가 나올 수 있습니다. 이는 일반적으로 부동 소수점 정밀도 오차 때문입니다. 수학 식이 동일하더라도 연산은 동일한 순서대로 실행되지 않습니다. 이로 인해 작은 차이가 발생합니다. 또한 샘플 가중치를 사용하면 정밀도보다 더 큰 오차가 생깁니다: # # * 에포크에서 손실은 지금까지 본 모든 배치 손실의 평균입니다. 각 배치 손실은 가중치가 적용된 샘플 손실의 합을 _배치 크기_ 로 나눈 것입니다(샘플 가중치의 합으로 나눈 것이 아닙니다. 따라서 배치 손실은 손실의 가중 평균이 아닙니다). # * 에포크에서 지표는 가중치가 적용된 샘플 손실의 합을 지금까지 본 모든 샘플 가중치의 합으로 나눈 것입니다. 다른 말로하면 모든 샘플 손실의 가중 평균입니다. 따라서 위와 같지 않습니다. # # 수학적으로 말하면 손실 = 지표 * 샘플 가중치의 평균(더하기 약간의 부동 소수점 정밀도 오차)입니다. # In[103]: model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[create_huber(2.0)]) # In[104]: sample_weight = np.random.rand(len(y_train)) history = model.fit(X_train_scaled, y_train, epochs=2, sample_weight=sample_weight) # In[105]: history.history["loss"][0], history.history["huber_fn"][0] * sample_weight.mean() # ### 스트리밍 지표 # In[106]: precision = keras.metrics.Precision() precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) # In[107]: precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0]) # In[108]: precision.result() # In[109]: precision.variables # In[110]: precision.reset_states() # 스트리밍 지표 만들기: # In[111]: class HuberMetric(keras.metrics.Metric): def __init__(self, threshold=1.0, **kwargs): super().__init__(**kwargs) # 기본 매개변수 처리 (예를 들면, dtype) self.threshold = threshold self.huber_fn = create_huber(threshold) self.total = self.add_weight("total", initializer="zeros") self.count = self.add_weight("count", initializer="zeros") def update_state(self, y_true, y_pred, sample_weight=None): metric = self.huber_fn(y_true, y_pred) self.total.assign_add(tf.reduce_sum(metric)) self.count.assign_add(tf.cast(tf.size(y_true), tf.float32)) def result(self): return self.total / self.count def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} # In[112]: m = HuberMetric(2.) # total = 2 * |10 - 2| - 2²/2 = 14 # count = 1 # result = 14 / 1 = 14 m(tf.constant([[2.]]), tf.constant([[10.]])) # In[113]: # total = total + (|1 - 0|² / 2) + (2 * |9.25 - 5| - 2² / 2) = 14 + 7 = 21 # count = count + 2 = 3 # result = total / count = 21 / 3 = 7 m(tf.constant([[0.], [5.]]), tf.constant([[1.], [9.25]])) m.result() # In[114]: m.variables # In[115]: m.reset_states() m.variables # `HuberMetric` 클래스가 잘 동작하는지 확인해 보죠: # In[116]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[117]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[118]: model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[HuberMetric(2.0)]) # In[119]: model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2) # In[120]: model.save("my_model_with_a_custom_metric.h5") # In[121]: model = keras.models.load_model("my_model_with_a_custom_metric.h5", custom_objects={"huber_fn": create_huber(2.0), "HuberMetric": HuberMetric}) # In[122]: model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2) # **경고**: 텐서플로 2.2에서 tf.keras가 `model.metrics`의 0번째 위치에 지표를 추가합니다([텐서플로 이슈 #38150](https://github.com/tensorflow/tensorflow/issues/38150) 참조). 따라서 `HuberMetric`에 접근하려면 `model.metrics[0]` 대신 `model.metrics[-1]`를 사용해야 합니다. # In[123]: model.metrics[-1].threshold # 잘 동작하는군요! 다음처럼 더 간단하게 클래스를 만들 수 있습니다: # In[124]: class HuberMetric(keras.metrics.Mean): def __init__(self, threshold=1.0, name='HuberMetric', dtype=None): self.threshold = threshold self.huber_fn = create_huber(threshold) super().__init__(name=name, dtype=dtype) def update_state(self, y_true, y_pred, sample_weight=None): metric = self.huber_fn(y_true, y_pred) super(HuberMetric, self).update_state(metric, sample_weight) def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} # 이 클래스는 크기를 잘 처리하고 샘플 가중치도 지원합니다. # In[125]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[126]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape), keras.layers.Dense(1), ]) # In[127]: model.compile(loss=keras.losses.Huber(2.0), optimizer="nadam", weighted_metrics=[HuberMetric(2.0)]) # In[128]: sample_weight = np.random.rand(len(y_train)) history = model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2, sample_weight=sample_weight) # In[129]: history.history["loss"][0], history.history["HuberMetric"][0] * sample_weight.mean() # In[130]: model.save("my_model_with_a_custom_metric_v2.h5") # In[131]: model = keras.models.load_model("my_model_with_a_custom_metric_v2.h5", custom_objects={"HuberMetric": HuberMetric}) # In[132]: model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2) # In[133]: model.metrics[-1].threshold # ## 사용자 정의 층 # In[134]: exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x)) # In[135]: exponential_layer([-1., 0., 1.]) # 회귀 모델이 예측할 값이 양수이고 스케일이 매우 다른 경우 (예를 들어, 0.001, 10., 10000) 출력층에 지수 함수를 추가하면 유용할 수 있습니다: # In[136]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[137]: model = keras.models.Sequential([ keras.layers.Dense(30, activation="relu", input_shape=input_shape), keras.layers.Dense(1), exponential_layer ]) model.compile(loss="mse", optimizer="sgd") model.fit(X_train_scaled, y_train, epochs=5, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # In[138]: class MyDense(keras.layers.Layer): def __init__(self, units, activation=None, **kwargs): super().__init__(**kwargs) self.units = units self.activation = keras.activations.get(activation) def build(self, batch_input_shape): self.kernel = self.add_weight( name="kernel", shape=[batch_input_shape[-1], self.units], initializer="glorot_normal") self.bias = self.add_weight( name="bias", shape=[self.units], initializer="zeros") super().build(batch_input_shape) # must be at the end def call(self, X): return self.activation(X @ self.kernel + self.bias) def compute_output_shape(self, batch_input_shape): return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units]) def get_config(self): base_config = super().get_config() return {**base_config, "units": self.units, "activation": keras.activations.serialize(self.activation)} # In[139]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[140]: model = keras.models.Sequential([ MyDense(30, activation="relu", input_shape=input_shape), MyDense(1) ]) # In[141]: model.compile(loss="mse", optimizer="nadam") model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # In[142]: model.save("my_model_with_a_custom_layer.h5") # In[143]: model = keras.models.load_model("my_model_with_a_custom_layer.h5", custom_objects={"MyDense": MyDense}) # In[144]: class MyMultiLayer(keras.layers.Layer): def call(self, X): X1, X2 = X print("X1.shape: ", X1.shape ," X2.shape: ", X2.shape) # 사용자 정의 층 디버깅 return X1 + X2, X1 * X2 def compute_output_shape(self, batch_input_shape): batch_input_shape1, batch_input_shape2 = batch_input_shape return [batch_input_shape1, batch_input_shape2] # 사용자 정의 층은 다음처럼 함수형 API를 사용해 호출할 수 있습니다: # In[145]: inputs1 = keras.layers.Input(shape=[2]) inputs2 = keras.layers.Input(shape=[2]) outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2)) # `call()` 메서드는 심볼릭 입력을 받습니다. 이 입력의 크기는 부분적으로만 지정되어 있습니다(이 시점에서는 배치 크기를 모릅니다. 그래서 첫 번째 차원이 None입니다): # # 사용자 층에 실제 데이터를 전달할 수도 있습니다. 이를 테스트하기 위해 각 데이터셋의 입력을 각각 네 개의 특성을 가진 두 부분으로 나누겠습니다: # In[146]: def split_data(data): columns_count = data.shape[-1] half = columns_count // 2 return data[:, :half], data[:, half:] X_train_scaled_A, X_train_scaled_B = split_data(X_train_scaled) X_valid_scaled_A, X_valid_scaled_B = split_data(X_valid_scaled) X_test_scaled_A, X_test_scaled_B = split_data(X_test_scaled) # 분할된 데이터 크기 출력 X_train_scaled_A.shape, X_train_scaled_B.shape # 크기가 완전하게 지정된 것을 볼 수 있습니다: # In[147]: outputs1, outputs2 = MyMultiLayer()((X_train_scaled_A, X_train_scaled_B)) # 함수형 API를 사용해 완전한 모델을 만들어 보겠습니다(이 모델은 간단한 예제이므로 놀라운 성능을 기대하지 마세요): # In[148]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) input_A = keras.layers.Input(shape=X_train_scaled_A.shape[-1]) input_B = keras.layers.Input(shape=X_train_scaled_B.shape[-1]) hidden_A, hidden_B = MyMultiLayer()((input_A, input_B)) hidden_A = keras.layers.Dense(30, activation='selu')(hidden_A) hidden_B = keras.layers.Dense(30, activation='selu')(hidden_B) concat = keras.layers.Concatenate()((hidden_A, hidden_B)) output = keras.layers.Dense(1)(concat) model = keras.models.Model(inputs=[input_A, input_B], outputs=[output]) # In[149]: model.compile(loss='mse', optimizer='nadam') # In[150]: model.fit((X_train_scaled_A, X_train_scaled_B), y_train, epochs=2, validation_data=((X_valid_scaled_A, X_valid_scaled_B), y_valid)) # 훈련과 테스트에서 다르게 동작하는 층을 만들어 보죠: # In[151]: class AddGaussianNoise(keras.layers.Layer): def __init__(self, stddev, **kwargs): super().__init__(**kwargs) self.stddev = stddev def call(self, X, training=None): if training: noise = tf.random.normal(tf.shape(X), stddev=self.stddev) return X + noise else: return X def compute_output_shape(self, batch_input_shape): return batch_input_shape # 다음은 사용자 정의 층을 사용하는 간단한 모델입니다: # In[152]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) model = keras.models.Sequential([ AddGaussianNoise(stddev=1.0), keras.layers.Dense(30, activation="selu"), keras.layers.Dense(1) ]) # In[153]: model.compile(loss="mse", optimizer="nadam") model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # ## 사용자 정의 모델 # In[154]: X_new_scaled = X_test_scaled # In[155]: class ResidualBlock(keras.layers.Layer): def __init__(self, n_layers, n_neurons, **kwargs): super().__init__(**kwargs) self.hidden = [keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal") for _ in range(n_layers)] def call(self, inputs): Z = inputs for layer in self.hidden: Z = layer(Z) return inputs + Z # In[156]: class ResidualRegressor(keras.models.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal") self.block1 = ResidualBlock(2, 30) self.block2 = ResidualBlock(2, 30) self.out = keras.layers.Dense(output_dim) def call(self, inputs): Z = self.hidden1(inputs) for _ in range(1 + 3): Z = self.block1(Z) Z = self.block2(Z) return self.out(Z) # In[157]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[158]: model = ResidualRegressor(1) model.compile(loss="mse", optimizer="nadam") history = model.fit(X_train_scaled, y_train, epochs=5) score = model.evaluate(X_test_scaled, y_test) y_pred = model.predict(X_new_scaled) # In[159]: model.save("my_custom_model.ckpt") # In[160]: model = keras.models.load_model("my_custom_model.ckpt") # In[161]: history = model.fit(X_train_scaled, y_train, epochs=5) # 대신 시퀀셜 API를 사용하는 모델을 정의할 수 있습니다: # In[162]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[163]: block1 = ResidualBlock(2, 30) model = keras.models.Sequential([ keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal"), block1, block1, block1, block1, ResidualBlock(2, 30), keras.layers.Dense(1) ]) # In[164]: model.compile(loss="mse", optimizer="nadam") history = model.fit(X_train_scaled, y_train, epochs=5) score = model.evaluate(X_test_scaled, y_test) y_pred = model.predict(X_new_scaled) # ## 모델 구성 요소에 기반한 손실과 지표 # **노트**: 다음 코드는 책의 코드와 두 가지 다른 점이 있습니다: # 1. 생성자에서 `keras.metrics.Mean()` 측정 지표를 만들고 `call()` 메서드에서 사용하여 평균 재구성 손실을 추적합니다. 훈련에서만 사용해야 하기 때문에 `call()` 메서드에 `training` 매개변수를 추가합니다. `training`이 `True`이면 `reconstruction_mean`를 업데이트하고 `self.add_metric()`를 호출합니다. # 2. TF 2.2에 있는 이슈([#46858](https://github.com/tensorflow/tensorflow/issues/46858)) 때문에 `build()` 메서드 안에서 `super().build()`를 호출하면 안됩니다. # In[165]: class ReconstructingRegressor(keras.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal") for _ in range(5)] self.out = keras.layers.Dense(output_dim) self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error") def build(self, batch_input_shape): n_inputs = batch_input_shape[-1] self.reconstruct = keras.layers.Dense(n_inputs) #super().build(batch_input_shape) def call(self, inputs, training=None): Z = inputs for layer in self.hidden: Z = layer(Z) reconstruction = self.reconstruct(Z) self.recon_loss = 0.05 * tf.reduce_mean(tf.square(reconstruction - inputs)) if training: result = self.reconstruction_mean(recon_loss) self.add_metric(result) return self.out(Z) def train_step(self, data): x, y = data with tf.GradientTape() as tape: y_pred = self(x) loss = self.compiled_loss(y, y_pred, regularization_losses=[self.recon_loss]) gradients = tape.gradient(loss, self.trainable_variables) self.optimizer.apply_gradients(zip(gradients, self.trainable_variables)) return {m.name: m.result() for m in self.metrics} # In[166]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[167]: model = ReconstructingRegressor(1) model.compile(loss="mse", optimizer="nadam") history = model.fit(X_train_scaled, y_train, epochs=2) y_pred = model.predict(X_test_scaled) # ## 자동 미분을 사용하여 그레이디언트 계산하기 # In[168]: def f(w1, w2): return 3 * w1 ** 2 + 2 * w1 * w2 # In[169]: w1, w2 = 5, 3 eps = 1e-6 (f(w1 + eps, w2) - f(w1, w2)) / eps # In[170]: (f(w1, w2 + eps) - f(w1, w2)) / eps # In[171]: w1, w2 = tf.Variable(5.), tf.Variable(3.) with tf.GradientTape() as tape: z = f(w1, w2) gradients = tape.gradient(z, [w1, w2]) # In[172]: gradients # In[173]: with tf.GradientTape() as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) try: dz_dw2 = tape.gradient(z, w2) except RuntimeError as ex: print(ex) # In[174]: with tf.GradientTape(persistent=True) as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) dz_dw2 = tape.gradient(z, w2) # works now! del tape # In[175]: dz_dw1, dz_dw2 # In[176]: c1, c2 = tf.constant(5.), tf.constant(3.) with tf.GradientTape() as tape: z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # In[177]: gradients # In[178]: with tf.GradientTape() as tape: tape.watch(c1) tape.watch(c2) z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # In[179]: gradients # In[180]: with tf.GradientTape() as tape: z1 = f(w1, w2 + 2.) z2 = f(w1, w2 + 5.) z3 = f(w1, w2 + 7.) tape.gradient([z1, z2, z3], [w1, w2]) # In[181]: with tf.GradientTape(persistent=True) as tape: z1 = f(w1, w2 + 2.) z2 = f(w1, w2 + 5.) z3 = f(w1, w2 + 7.) tf.reduce_sum(tf.stack([tape.gradient(z, [w1, w2]) for z in (z1, z2, z3)]), axis=0) del tape # In[182]: with tf.GradientTape(persistent=True) as hessian_tape: with tf.GradientTape() as jacobian_tape: z = f(w1, w2) jacobians = jacobian_tape.gradient(z, [w1, w2]) hessians = [hessian_tape.gradient(jacobian, [w1, w2]) for jacobian in jacobians] del hessian_tape # In[183]: jacobians # In[184]: hessians # In[185]: def f(w1, w2): return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2) with tf.GradientTape() as tape: z = f(w1, w2) tape.gradient(z, [w1, w2]) # In[186]: x = tf.Variable(100.) with tf.GradientTape() as tape: z = my_softplus(x) tape.gradient(z, [x]) # In[187]: tf.math.log(tf.exp(tf.constant(30., dtype=tf.float32)) + 1.) # In[188]: x = tf.Variable([100.]) with tf.GradientTape() as tape: z = my_softplus(x) tape.gradient(z, [x]) # In[189]: @tf.custom_gradient def my_better_softplus(z): exp = tf.exp(z) def my_softplus_gradients(grad): return grad / (1 + 1 / exp) return tf.math.log(exp + 1), my_softplus_gradients # In[190]: def my_better_softplus(z): return tf.where(z > 30., z, tf.math.log(tf.exp(z) + 1.)) # In[191]: x = tf.Variable([1000.]) with tf.GradientTape() as tape: z = my_better_softplus(x) z, tape.gradient(z, [x]) # # 사용자 정의 훈련 반복 # In[192]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[193]: l2_reg = keras.regularizers.l2(0.05) model = keras.models.Sequential([ keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg), keras.layers.Dense(1, kernel_regularizer=l2_reg) ]) # In[194]: def random_batch(X, y, batch_size=32): idx = np.random.randint(len(X), size=batch_size) return X[idx], y[idx] # In[195]: def print_status_bar(iteration, total, loss, metrics=None): metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])]) end = "" if iteration < total else "\n" print("\r{}/{} - ".format(iteration, total) + metrics, end=end) # In[196]: import time mean_loss = keras.metrics.Mean(name="loss") mean_square = keras.metrics.Mean(name="mean_square") for i in range(1, 50 + 1): loss = 1 / i mean_loss(loss) mean_square(i ** 2) print_status_bar(i, 50, mean_loss, [mean_square]) time.sleep(0.05) # A fancier version with a progress bar: # In[197]: def progress_bar(iteration, total, size=30): running = iteration < total c = ">" if running else "=" p = (size - 1) * iteration // total fmt = "{{:-{}d}}/{{}} [{{}}]".format(len(str(total))) params = [iteration, total, "=" * p + c + "." * (size - p - 1)] return fmt.format(*params) # In[198]: progress_bar(3500, 10000, size=6) # In[199]: def print_status_bar(iteration, total, loss, metrics=None, size=30): metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])]) end = "" if iteration < total else "\n" print("\r{} - {}".format(progress_bar(iteration, total), metrics), end=end) # In[200]: mean_loss = keras.metrics.Mean(name="loss") mean_square = keras.metrics.Mean(name="mean_square") for i in range(1, 50 + 1): loss = 1 / i mean_loss(loss) mean_square(i ** 2) print_status_bar(i, 50, mean_loss, [mean_square]) time.sleep(0.05) # In[201]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[202]: n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size optimizer = keras.optimizers.Nadam(learning_rate=0.01) loss_fn = keras.losses.mean_squared_error mean_loss = keras.metrics.Mean() metrics = [keras.metrics.MeanAbsoluteError()] # In[203]: for epoch in range(1, n_epochs + 1): print("Epoch {}/{}".format(epoch, n_epochs)) for step in range(1, n_steps + 1): X_batch, y_batch = random_batch(X_train_scaled, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) mean_loss(loss) for metric in metrics: metric(y_batch, y_pred) print_status_bar(step * batch_size, len(y_train), mean_loss, metrics) print_status_bar(len(y_train), len(y_train), mean_loss, metrics) for metric in [mean_loss] + metrics: metric.reset_states() # In[204]: try: from tqdm.notebook import trange from collections import OrderedDict with trange(1, n_epochs + 1, desc="All epochs") as epochs: for epoch in epochs: with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps: for step in steps: X_batch, y_batch = random_batch(X_train_scaled, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) status = OrderedDict() mean_loss(loss) status["loss"] = mean_loss.result().numpy() for metric in metrics: metric(y_batch, y_pred) status[metric.name] = metric.result().numpy() steps.set_postfix(status) for metric in [mean_loss] + metrics: metric.reset_states() except ImportError as ex: print("To run this cell, please install tqdm, ipywidgets and restart Jupyter") # ## 텐서플로 함수 # In[205]: def cube(x): return x ** 3 # In[206]: cube(2) # In[207]: cube(tf.constant(2.0)) # In[208]: tf_cube = tf.function(cube) tf_cube # In[209]: tf_cube(2) # In[210]: tf_cube(tf.constant(2.0)) # ### TF 함수와 콘크리트 함수 # In[211]: concrete_function = tf_cube.get_concrete_function(tf.constant(2.0)) concrete_function.graph # In[212]: concrete_function(tf.constant(2.0)) # In[213]: concrete_function is tf_cube.get_concrete_function(tf.constant(2.0)) # ### 함수 정의와 그래프 # In[214]: concrete_function.graph # In[215]: ops = concrete_function.graph.get_operations() ops # In[216]: pow_op = ops[2] list(pow_op.inputs) # In[217]: pow_op.outputs # In[218]: concrete_function.graph.get_operation_by_name('x') # In[219]: concrete_function.graph.get_tensor_by_name('Identity:0') # In[220]: concrete_function.function_def.signature # ### TF 함수가 계산 그래프를 추출하기 위해 파이썬 함수를 트레이싱하는 방법 # In[221]: @tf.function def tf_cube(x): print("print:", x) return x ** 3 # In[222]: result = tf_cube(tf.constant(2.0)) # In[223]: result # In[224]: result = tf_cube(2) result = tf_cube(3) result = tf_cube(tf.constant([[1., 2.]])) # New shape: trace! result = tf_cube(tf.constant([[3., 4.], [5., 6.]])) # New shape: trace! result = tf_cube(tf.constant([[7., 8.], [9., 10.], [11., 12.]])) # New shape: trace! # 특정 입력 시그니처를 지정하는 것도 가능합니다: # In[225]: @tf.function(input_signature=[tf.TensorSpec([None, 28, 28], tf.float32)]) def shrink(images): print("트레이싱", images) return images[:, ::2, ::2] # 행과 열의 절반을 버립니다 # In[226]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[227]: img_batch_1 = tf.random.uniform(shape=[100, 28, 28]) img_batch_2 = tf.random.uniform(shape=[50, 28, 28]) preprocessed_images = shrink(img_batch_1) # 함수 트레이싱 preprocessed_images = shrink(img_batch_2) # 동일한 콘크리트 함수 재사용 # In[228]: img_batch_3 = tf.random.uniform(shape=[2, 2, 2]) try: preprocessed_images = shrink(img_batch_3) # 다른 타입이나 크기 거부 except ValueError as ex: print(ex) # ### 오토그래프를 사용해 제어 흐름 나타내기 # `range()`를 사용한 정적인 `for` 반복: # In[229]: @tf.function def add_10(x): for i in range(10): x += 1 return x # In[230]: add_10(tf.constant(5)) # In[231]: add_10.get_concrete_function(tf.constant(5)).graph.get_operations() # `tf.while_loop()`를 사용한 동적인 반복: # In[232]: @tf.function def add_10(x): condition = lambda i, x: tf.less(i, 10) body = lambda i, x: (tf.add(i, 1), tf.add(x, 1)) final_i, final_x = tf.while_loop(condition, body, [tf.constant(0), x]) return final_x # In[233]: add_10(tf.constant(5)) # In[234]: add_10.get_concrete_function(tf.constant(5)).graph.get_operations() # (오토그래프에 의한) `tf.range()`를 사용한 동적인 `for` 반복: # In[235]: @tf.function def add_10(x): for i in tf.range(10): x = x + 1 return x # In[236]: add_10.get_concrete_function(tf.constant(0)).graph.get_operations() # ### TF 함수에서 변수와 다른 자원 다루기 # In[237]: counter = tf.Variable(0) @tf.function def increment(counter, c=1): return counter.assign_add(c) # In[238]: increment(counter) increment(counter) # In[239]: function_def = increment.get_concrete_function(counter).function_def function_def.signature.input_arg[0] # In[240]: counter = tf.Variable(0) @tf.function def increment(c=1): return counter.assign_add(c) # In[241]: increment() increment() # In[242]: function_def = increment.get_concrete_function().function_def function_def.signature.input_arg[0] # In[243]: class Counter: def __init__(self): self.counter = tf.Variable(0) @tf.function def increment(self, c=1): return self.counter.assign_add(c) # In[244]: c = Counter() c.increment() c.increment() # In[245]: @tf.function def add_10(x): for i in tf.range(10): x += 1 return x print(tf.autograph.to_code(add_10.python_function)) # In[246]: def display_tf_code(func): from IPython.display import display, Markdown if hasattr(func, "python_function"): func = func.python_function code = tf.autograph.to_code(func) display(Markdown('```python\n{}\n```'.format(code))) # In[247]: display_tf_code(add_10) # ## tf.keras와 TF 함수를 함께 사용하거나 사용하지 않기 # 기본적으로 tf.keras는 자동으로 사용자 정의 코드를 TF 함수로 변환하기 때문에 `tf.function()`을 사용할 필요가 없습니다: # In[248]: # 사용자 손실 함수 def my_mse(y_true, y_pred): print("my_mse() 손실 트레이싱") return tf.reduce_mean(tf.square(y_pred - y_true)) # In[249]: # 사용자 지표 함수 def my_mae(y_true, y_pred): print("my_mae() 지표 트레이싱") return tf.reduce_mean(tf.abs(y_pred - y_true)) # In[250]: # 사용자 정의 층 class MyDense(keras.layers.Layer): def __init__(self, units, activation=None, **kwargs): super().__init__(**kwargs) self.units = units self.activation = keras.activations.get(activation) def build(self, input_shape): self.kernel = self.add_weight(name='kernel', shape=(input_shape[1], self.units), initializer='uniform', trainable=True) self.biases = self.add_weight(name='bias', shape=(self.units,), initializer='zeros', trainable=True) super().build(input_shape) def call(self, X): print("MyDense.call() 트레이싱") return self.activation(X @ self.kernel + self.biases) # In[251]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[252]: # 사용자 정의 모델 class MyModel(keras.models.Model): def __init__(self, **kwargs): super().__init__(**kwargs) self.hidden1 = MyDense(30, activation="relu") self.hidden2 = MyDense(30, activation="relu") self.output_ = MyDense(1) def call(self, input): print("MyModel.call() 트레이싱") hidden1 = self.hidden1(input) hidden2 = self.hidden2(hidden1) concat = keras.layers.concatenate([input, hidden2]) output = self.output_(concat) return output model = MyModel() # In[253]: model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae]) # In[254]: model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid)) model.evaluate(X_test_scaled, y_test) # `dynamic=True`로 모델을 만들어 이 기능을 끌 수 있습니다(또는 모델의 생성자에서 `super().__init__(dynamic=True, **kwargs)`를 호출합니다): # In[255]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[256]: model = MyModel(dynamic=True) # In[257]: model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae]) # 사용자 정의 코드는 반복마다 호출됩니다. 너무 많이 출력되는 것을 피하기 위해 작은 데이터셋으로 훈련, 검증, 평가해 보겠습니다: # In[258]: model.fit(X_train_scaled[:64], y_train[:64], epochs=1, validation_data=(X_valid_scaled[:64], y_valid[:64]), verbose=0) model.evaluate(X_test_scaled[:64], y_test[:64], verbose=0) # 또는 모델을 컴파일할 때 `run_eagerly=True`를 지정합니다: # In[259]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[260]: model = MyModel() # In[261]: model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae], run_eagerly=True) # In[262]: model.fit(X_train_scaled[:64], y_train[:64], epochs=1, validation_data=(X_valid_scaled[:64], y_valid[:64]), verbose=0) model.evaluate(X_test_scaled[:64], y_test[:64], verbose=0) # ## 사용자 정의 옵티마이저 # 사용자 정의 옵티마이저를 정의하는 것은 일반적이지 않습니다. 하지만 어쩔 수 없이 만들어야 하는 상황이라면 다음 예를 참고하세요: # In[263]: class MyMomentumOptimizer(keras.optimizers.Optimizer): def __init__(self, learning_rate=0.001, momentum=0.9, name="MyMomentumOptimizer", **kwargs): """super().__init__()를 호출하고 _set_hyper()를 사용해 하이퍼파라미터를 저장합니다""" super().__init__(name, **kwargs) self._set_hyper("learning_rate", kwargs.get("lr", learning_rate)) # lr=learning_rate을 처리 self._set_hyper("decay", self._initial_decay) # self._set_hyper("momentum", momentum) def _create_slots(self, var_list): """모델 파라미터마다 연관된 옵티마이저 변수를 만듭니다. 텐서플로는 이런 옵티마이저 변수를 '슬롯'이라고 부릅니다. 모멘텀 옵티마이저에서는 모델 파라미터마다 하나의 모멘텀 슬롯이 필요합니다. """ for var in var_list: self.add_slot(var, "momentum") @tf.function def _resource_apply_dense(self, grad, var): """슬롯을 업데이트하고 모델 파라미터에 대한 옵티마이저 스텝을 수행합니다. """ var_dtype = var.dtype.base_dtype lr_t = self._decayed_lr(var_dtype) # 학습률 감쇠 처리 momentum_var = self.get_slot(var, "momentum") momentum_hyper = self._get_hyper("momentum", var_dtype) momentum_var.assign(momentum_var * momentum_hyper - (1. - momentum_hyper)* grad) var.assign_add(momentum_var * lr_t) def _resource_apply_sparse(self, grad, var): raise NotImplementedError def get_config(self): base_config = super().get_config() return { **base_config, "learning_rate": self._serialize_hyperparameter("learning_rate"), "decay": self._serialize_hyperparameter("decay"), "momentum": self._serialize_hyperparameter("momentum"), } # In[264]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[265]: model = keras.models.Sequential([keras.layers.Dense(1, input_shape=[8])]) model.compile(loss="mse", optimizer=MyMomentumOptimizer()) model.fit(X_train_scaled, y_train, epochs=5) # # 연습문제 # ## 1. to 11. # 부록 A 참조. # ## 12. _층 정규화_ 를 수행하는 사용자 정의 층을 구현하세요. # # _15장에서 순환 신경망을 사용할 때 이런 종류의 층을 사용합니다._ # ### a. # _문제: `build()` 메서드에서 두 개의 훈련 가능한 가중치 *α*와 *β*를 정의합니다. 두 가중치 모두 크기가 `input_shape[-1:]`이고 데이터 타입은 `tf.float32`입니다. *α*는 1로 초기화되고 *β*는 0으로 초기화되어야 합니다._ # 솔루션: 아래 참조. # ### b. # _문제: `call()` 메서드는 샘플의 특성마다 평균 μ와 표준편차 σ를 계산해야 합니다. 이를 위해 전체 샘플의 평균 μ와 분산 σ2을 반환하는 `tf.nn.moments(inputs, axes=-1, keepdims=True)`을 사용할 수 있습니다(분산의 제곱근으로 표준편차를 계산합니다). 그다음 *α*⊗(*X* - μ)/(σ + ε) + *β*를 계산하여 반환합니다. 여기에서 ⊗는 원소별 # 곱셈(`*`)을 나타냅니다. ε은 안전을 위한 항입니다(0으로 나누어지는 것을 막기 위한 작은 상수. 예를 들면 0.001)._ # In[266]: class LayerNormalization(keras.layers.Layer): def __init__(self, eps=0.001, **kwargs): super().__init__(**kwargs) self.eps = eps def build(self, batch_input_shape): self.alpha = self.add_weight( name="alpha", shape=batch_input_shape[-1:], initializer="ones") self.beta = self.add_weight( name="beta", shape=batch_input_shape[-1:], initializer="zeros") super().build(batch_input_shape) # 반드시 끝에 와야 합니다 def call(self, X): mean, variance = tf.nn.moments(X, axes=-1, keepdims=True) return self.alpha * (X - mean) / (tf.sqrt(variance + self.eps)) + self.beta def compute_output_shape(self, batch_input_shape): return batch_input_shape def get_config(self): base_config = super().get_config() return {**base_config, "eps": self.eps} # _ε_ 하이퍼파라미터(`eps`)는 필수가 아닙니다. 또한 `tf.sqrt(variance) + self.eps` 보다 `tf.sqrt(variance + self.eps)`를 계산하는 것이 좋습니다. sqrt(z)의 도함수는 z=0에서 정의되지 않기 때문에 분산 벡터의 한 원소가 0에 가까우면 훈련이 이리저리 널뜁니다. 제곱근 안에 _ε_를 넣으면 이런 현상을 방지할 수 있습니다. # ### c. # _문제: 사용자 정의 층이 `keras.layers.LayerNormalization` 층과 동일한(또는 거의 동일한) 출력을 만드는지 확인하세요._ # 각 클래스의 객체를 만들고 데이터(예를 들면, 훈련 세트)를 적용해 보죠. 차이는 무시할 수 있는 수준입니다. # In[267]: X = X_train.astype(np.float32) custom_layer_norm = LayerNormalization() keras_layer_norm = keras.layers.LayerNormalization() tf.reduce_mean(keras.losses.mean_absolute_error( keras_layer_norm(X), custom_layer_norm(X))) # 네 충분히 가깝네요. 조금 더 확실하게 알파와 베타를 완전히 랜덤하게 지정하고 다시 비교해 보죠: # In[268]: random_alpha = np.random.rand(X.shape[-1]) random_beta = np.random.rand(X.shape[-1]) custom_layer_norm.set_weights([random_alpha, random_beta]) keras_layer_norm.set_weights([random_alpha, random_beta]) tf.reduce_mean(keras.losses.mean_absolute_error( keras_layer_norm(X), custom_layer_norm(X))) # 여전히 무시할 수 있는 수준입니다! 사용자 정의 층이 잘 동작합니다. # ## 13. 사용자 정의 훈련 반복을 사용해 패션 MNIST 데이터셋으로 모델을 훈련해보세요. # # _패션 MNIST 데이터셋은 10장에서 소개했습니다._ # ### a. # _문제: 에포크, 반복, 평균 훈련 손실, (반복마다 업데이트되는) 에포크의 평균 정확도는 물론 에포크 끝에서 검증 손실과 정확도를 출력하세요._ # In[269]: (X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data() X_train_full = X_train_full.astype(np.float32) / 255. X_valid, X_train = X_train_full[:5000], X_train_full[5000:] y_valid, y_train = y_train_full[:5000], y_train_full[5000:] X_test = X_test.astype(np.float32) / 255. # In[270]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[271]: model = keras.models.Sequential([ keras.layers.Flatten(input_shape=[28, 28]), keras.layers.Dense(100, activation="relu"), keras.layers.Dense(10, activation="softmax"), ]) # In[272]: n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size optimizer = keras.optimizers.Nadam(learning_rate=0.01) loss_fn = keras.losses.sparse_categorical_crossentropy mean_loss = keras.metrics.Mean() metrics = [keras.metrics.SparseCategoricalAccuracy()] # In[273]: with trange(1, n_epochs + 1, desc="All epochs") as epochs: for epoch in epochs: with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps: for step in steps: X_batch, y_batch = random_batch(X_train, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) status = OrderedDict() mean_loss(loss) status["loss"] = mean_loss.result().numpy() for metric in metrics: metric(y_batch, y_pred) status[metric.name] = metric.result().numpy() steps.set_postfix(status) y_pred = model(X_valid) status["val_loss"] = np.mean(loss_fn(y_valid, y_pred)) status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy( tf.constant(y_valid, dtype=np.float32), y_pred)) steps.set_postfix(status) for metric in [mean_loss] + metrics: metric.reset_states() # ### b. # _문제: 상위 층과 하위 층에 학습률이 다른 옵티마이저를 따로 사용해보세요._ # In[274]: keras.backend.clear_session() np.random.seed(42) tf.random.set_seed(42) # In[275]: lower_layers = keras.models.Sequential([ keras.layers.Flatten(input_shape=[28, 28]), keras.layers.Dense(100, activation="relu"), ]) upper_layers = keras.models.Sequential([ keras.layers.Dense(10, activation="softmax"), ]) model = keras.models.Sequential([ lower_layers, upper_layers ]) # In[276]: lower_optimizer = keras.optimizers.SGD(learning_rate=1e-4) upper_optimizer = keras.optimizers.Nadam(learning_rate=1e-3) # In[277]: n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size loss_fn = keras.losses.sparse_categorical_crossentropy mean_loss = keras.metrics.Mean() metrics = [keras.metrics.SparseCategoricalAccuracy()] # In[278]: with trange(1, n_epochs + 1, desc="All epochs") as epochs: for epoch in epochs: with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps: for step in steps: X_batch, y_batch = random_batch(X_train, y_train) with tf.GradientTape(persistent=True) as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) for layers, optimizer in ((lower_layers, lower_optimizer), (upper_layers, upper_optimizer)): gradients = tape.gradient(loss, layers.trainable_variables) optimizer.apply_gradients(zip(gradients, layers.trainable_variables)) del tape for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) status = OrderedDict() mean_loss(loss) status["loss"] = mean_loss.result().numpy() for metric in metrics: metric(y_batch, y_pred) status[metric.name] = metric.result().numpy() steps.set_postfix(status) y_pred = model(X_valid) status["val_loss"] = np.mean(loss_fn(y_valid, y_pred)) status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy( tf.constant(y_valid, dtype=np.float32), y_pred)) steps.set_postfix(status) for metric in [mean_loss] + metrics: metric.reset_states()