#!/usr/bin/env python # coding: utf-8 # In[1]: get_ipython().run_line_magic('load_ext', 'watermark') get_ipython().run_line_magic('watermark', '-v -p numpy,scipy,matplotlib,tensorflow') # **10장 – 인공 신경망 소개** # _이 노트북은 10장에 있는 모든 샘플 코드와 연습문제 해답을 가지고 있습니다._ # # 설정 # 파이썬 2와 3을 모두 지원합니다. 공통 모듈을 임포트하고 맷플롯립 그림이 노트북 안에 포함되도록 설정하고 생성한 그림을 저장하기 위한 함수를 준비합니다: # In[2]: # 파이썬 2와 파이썬 3 지원 from __future__ import division, print_function, unicode_literals # 공통 import numpy as np import os # 일관된 출력을 위해 유사난수 초기화 def reset_graph(seed=42): tf.reset_default_graph() tf.set_random_seed(seed) np.random.seed(seed) # 맷플롯립 설정 get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib import matplotlib.pyplot as plt plt.rcParams['axes.labelsize'] = 14 plt.rcParams['xtick.labelsize'] = 12 plt.rcParams['ytick.labelsize'] = 12 # 한글출력 plt.rcParams['font.family'] = 'NanumBarunGothic' plt.rcParams['axes.unicode_minus'] = False # 그림을 저장할 폴더 PROJECT_ROOT_DIR = "." CHAPTER_ID = "ann" def save_fig(fig_id, tight_layout=True): path = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID, fig_id + ".png") if tight_layout: plt.tight_layout() plt.savefig(path, format='png', dpi=300) # # 퍼셉트론 # In[3]: import numpy as np from sklearn.datasets import load_iris from sklearn.linear_model import Perceptron iris = load_iris() X = iris.data[:, (2, 3)] # 꽃잎 길이, 꽃잎 너비 y = (iris.target == 0).astype(np.int) per_clf = Perceptron(max_iter=100, random_state=42) per_clf.fit(X, y) y_pred = per_clf.predict([[2, 0.5]]) # In[4]: y_pred # In[5]: a = -per_clf.coef_[0][0] / per_clf.coef_[0][1] b = -per_clf.intercept_ / per_clf.coef_[0][1] axes = [0, 5, 0, 2] x0, x1 = np.meshgrid( np.linspace(axes[0], axes[1], 500).reshape(-1, 1), np.linspace(axes[2], axes[3], 200).reshape(-1, 1), ) X_new = np.c_[x0.ravel(), x1.ravel()] y_predict = per_clf.predict(X_new) zz = y_predict.reshape(x0.shape) plt.figure(figsize=(10, 4)) plt.plot(X[y==0, 0], X[y==0, 1], "bs", label="Iris-Setosa 아님") plt.plot(X[y==1, 0], X[y==1, 1], "yo", label="Iris-Setosa") plt.plot([axes[0], axes[1]], [a * axes[0] + b, a * axes[1] + b], "k-", linewidth=3) from matplotlib.colors import ListedColormap custom_cmap = ListedColormap(['#9898ff', '#fafab0']) plt.contourf(x0, x1, zz, cmap=custom_cmap) plt.xlabel("꽃잎 길이", fontsize=14) plt.ylabel("꽃잎 너비", fontsize=14) plt.legend(loc="lower right", fontsize=14) plt.axis(axes) save_fig("perceptron_iris_plot") plt.show() # # 활성화 함수 # In[6]: def logit(z): return 1 / (1 + np.exp(-z)) def relu(z): return np.maximum(0, z) def derivative(f, z, eps=0.000001): return (f(z + eps) - f(z - eps))/(2 * eps) # In[7]: z = np.linspace(-5, 5, 200) plt.figure(figsize=(11,4)) plt.subplot(121) plt.plot(z, np.sign(z), "r-", linewidth=2, label="스텝") plt.plot(z, logit(z), "g--", linewidth=2, label="로지스틱") plt.plot(z, np.tanh(z), "b-", linewidth=2, label="Tanh") plt.plot(z, relu(z), "m-.", linewidth=2, label="ReLU") plt.grid(True) plt.legend(loc="center right", fontsize=14) plt.title("활성화 함수", fontsize=14) plt.axis([-5, 5, -1.2, 1.2]) plt.subplot(122) plt.plot(z, derivative(np.sign, z), "r-", linewidth=2, label="Step") plt.plot(0, 0, "ro", markersize=5) plt.plot(0, 0, "rx", markersize=10) plt.plot(z, derivative(logit, z), "g--", linewidth=2, label="Logit") plt.plot(z, derivative(np.tanh, z), "b-", linewidth=2, label="Tanh") plt.plot(z, derivative(relu, z), "m-.", linewidth=2, label="ReLU") plt.grid(True) plt.title("도함수", fontsize=14) plt.axis([-5, 5, -0.2, 1.2]) save_fig("activation_functions_plot") plt.show() # In[8]: def heaviside(z): return (z >= 0).astype(z.dtype) def sigmoid(z): return 1/(1+np.exp(-z)) def mlp_xor(x1, x2, activation=heaviside): return activation(-activation(x1 + x2 - 1.5) + activation(x1 + x2 - 0.5) - 0.5) # In[9]: x1s = np.linspace(-0.2, 1.2, 100) x2s = np.linspace(-0.2, 1.2, 100) x1, x2 = np.meshgrid(x1s, x2s) z1 = mlp_xor(x1, x2, activation=heaviside) z2 = mlp_xor(x1, x2, activation=sigmoid) plt.figure(figsize=(10,4)) plt.subplot(121) plt.contourf(x1, x2, z1) plt.plot([0, 1], [0, 1], "gs", markersize=20) plt.plot([0, 1], [1, 0], "y^", markersize=20) plt.title("활성화 함수: 헤비사이드", fontsize=14) plt.grid(True) plt.subplot(122) plt.contourf(x1, x2, z2) plt.plot([0, 1], [0, 1], "gs", markersize=20) plt.plot([0, 1], [1, 0], "y^", markersize=20) plt.title("활성화 함수: 시그모이드", fontsize=14) plt.grid(True) # # MNIST를 위한 FNN # In[10]: import tensorflow as tf # 주의: `tf.examples.tutorials.mnist`은 삭제될 예정이므로 대신 `tf.keras.datasets.mnist`를 사용하겠습니다. `tf.contrib.learn` API는 `tf.estimator`와 `tf.feature_column`로 옮겨졌고 상당히 많이 바뀌었습니다. 특히 `infer_real_valued_columns_from_input()` 함수와 `SKCompat` 클래스가 없습니다. # In[11]: (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() X_train = X_train.astype(np.float32).reshape(-1, 28*28) / 255.0 X_test = X_test.astype(np.float32).reshape(-1, 28*28) / 255.0 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32) X_valid, X_train = X_train[:5000], X_train[5000:] y_valid, y_train = y_train[:5000], y_train[5000:] # ## Estimator API를 사용 # In[12]: feature_cols = [tf.feature_column.numeric_column("X", shape=[28 * 28])] dnn_clf = tf.estimator.DNNClassifier(hidden_units=[300,100], n_classes=10, feature_columns=feature_cols) input_fn = tf.estimator.inputs.numpy_input_fn( x={"X": X_train}, y=y_train, num_epochs=40, batch_size=50, shuffle=True) dnn_clf.train(input_fn=input_fn) # In[13]: test_input_fn = tf.estimator.inputs.numpy_input_fn( x={"X": X_test}, y=y_test, shuffle=False) eval_results = dnn_clf.evaluate(input_fn=test_input_fn) # In[14]: eval_results # In[15]: y_pred_iter = dnn_clf.predict(input_fn=test_input_fn) y_pred = list(y_pred_iter) y_pred[0] # ## `tf.contrib.learn`을 사용 # In[16]: # from tensorflow.examples.tutorials.mnist import input_data # mnist = input_data.read_data_sets("/tmp/data/") # In[17]: # X_train = mnist.train.images # X_test = mnist.test.images # y_train = mnist.train.labels.astype("int") # y_test = mnist.test.labels.astype("int") # In[18]: config = tf.contrib.learn.RunConfig(tf_random_seed=42) # 책에는 없음 feature_cols = tf.contrib.learn.infer_real_valued_columns_from_input(X_train) dnn_clf = tf.contrib.learn.DNNClassifier(hidden_units=[300,100], n_classes=10, feature_columns=feature_cols, config=config) dnn_clf = tf.contrib.learn.SKCompat(dnn_clf) # if TensorFlow >= 1.1 tf.logging.set_verbosity(tf.logging.INFO) dnn_clf.fit(X_train, y_train, batch_size=50, steps=40000) # In[19]: from sklearn.metrics import accuracy_score y_pred = dnn_clf.predict(X_test) accuracy_score(y_test, y_pred['classes']) # In[20]: from sklearn.metrics import log_loss y_pred_proba = y_pred['probabilities'] log_loss(y_test, y_pred_proba) # ## 저수준의 TensorFlow API 사용 # In[21]: import tensorflow as tf n_inputs = 28*28 # MNIST n_hidden1 = 300 n_hidden2 = 100 n_outputs = 10 # In[22]: reset_graph() X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X") y = tf.placeholder(tf.int32, shape=(None), name="y") # In[23]: def neuron_layer(X, n_neurons, name, activation=None): with tf.name_scope(name): n_inputs = int(X.get_shape()[1]) stddev = 2 / np.sqrt(n_inputs) init = tf.truncated_normal((n_inputs, n_neurons), stddev=stddev) W = tf.Variable(init, name="kernel") b = tf.Variable(tf.zeros([n_neurons]), name="bias") Z = tf.matmul(X, W) + b if activation is not None: return activation(Z) else: return Z # In[24]: with tf.name_scope("dnn"): hidden1 = neuron_layer(X, n_hidden1, name="hidden1", activation=tf.nn.relu) hidden2 = neuron_layer(hidden1, n_hidden2, name="hidden2", activation=tf.nn.relu) logits = neuron_layer(hidden2, n_outputs, name="outputs") # In[25]: with tf.name_scope("loss"): xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") # In[26]: learning_rate = 0.01 with tf.name_scope("train"): optimizer = tf.train.GradientDescentOptimizer(learning_rate) training_op = optimizer.minimize(loss) # In[27]: with tf.name_scope("eval"): correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) # In[28]: init = tf.global_variables_initializer() saver = tf.train.Saver() # In[29]: n_epochs = 40 batch_size = 50 # In[30]: def shuffle_batch(X, y, batch_size): rnd_idx = np.random.permutation(len(X)) n_batches = len(X) // batch_size for batch_idx in np.array_split(rnd_idx, n_batches): X_batch, y_batch = X[batch_idx], y[batch_idx] yield X_batch, y_batch # In[31]: with tf.Session() as sess: init.run() for epoch in range(n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch}) acc_valid = accuracy.eval(feed_dict={X: X_valid, y: y_valid}) print(epoch, "배치 데이터 정확도:", acc_batch, "검증 세트 정확도:", acc_valid) save_path = saver.save(sess, "./my_model_final.ckpt") # In[32]: with tf.Session() as sess: saver.restore(sess, "./my_model_final.ckpt") # 또는 save_path를 사용합니다 X_new_scaled = X_test[:20] Z = logits.eval(feed_dict={X: X_new_scaled}) y_pred = np.argmax(Z, axis=1) # In[33]: print("예측 클래스:", y_pred) print("진짜 클래스:", y_test[:20]) # In[34]: from tensorflow_graph_in_jupyter import show_graph # In[35]: show_graph(tf.get_default_graph()) # ## `neuron_layer()` 대신 `dense()` 사용 # In[36]: n_inputs = 28*28 # MNIST n_hidden1 = 300 n_hidden2 = 100 n_outputs = 10 # In[37]: reset_graph() X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X") y = tf.placeholder(tf.int32, shape=(None), name="y") # In[38]: with tf.name_scope("dnn"): hidden1 = tf.layers.dense(X, n_hidden1, name="hidden1", activation=tf.nn.relu) hidden2 = tf.layers.dense(hidden1, n_hidden2, name="hidden2", activation=tf.nn.relu) logits = tf.layers.dense(hidden2, n_outputs, name="outputs") y_proba = tf.nn.softmax(logits) # In[39]: with tf.name_scope("loss"): xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") # In[40]: learning_rate = 0.01 with tf.name_scope("train"): optimizer = tf.train.GradientDescentOptimizer(learning_rate) training_op = optimizer.minimize(loss) # In[41]: with tf.name_scope("eval"): correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) # In[42]: init = tf.global_variables_initializer() saver = tf.train.Saver() # In[43]: n_epochs = 20 n_batches = 50 with tf.Session() as sess: init.run() for epoch in range(n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch}) acc_valid = accuracy.eval(feed_dict={X: X_valid, y: y_valid}) print(epoch, "배치 데이터 정확도:", acc_batch, "검증 세트 정확도:", acc_valid) save_path = saver.save(sess, "./my_model_final.ckpt") # In[44]: show_graph(tf.get_default_graph()) # # 연습문제 정답 # ## 1. to 8. # 부록 A 참조. # ## 9. # _깊은 다층 퍼셉트론을 MNIST 데이터셋에 훈련시키고 98% 정확도를 얻을 수 있는지 확인해보세요. 9장의 마지막 연습문제에서와 같이 모든 부가 기능을 추가해보세요(즉, 체크포인트를 저장하고, 중지되었을 때 마지막 체크포인트를 복원하고, 서머리를 추가하고, 텐서보드를 사용해 학습 곡선을 그려보세요)._ # 먼저 심층 신경망을 만듭니다. 한가지 추가된 것 외에는 앞서 했던 것과 동일합니다. 텐서보드에서 학습 곡선을 볼 수 있도록 훈련하는 동안 손실과 정확도를 기록하는 `tf.summary.scalar()` 추가합니다. # In[45]: n_inputs = 28*28 # MNIST n_hidden1 = 300 n_hidden2 = 100 n_outputs = 10 # In[46]: reset_graph() X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X") y = tf.placeholder(tf.int32, shape=(None), name="y") # In[47]: with tf.name_scope("dnn"): hidden1 = tf.layers.dense(X, n_hidden1, name="hidden1", activation=tf.nn.relu) hidden2 = tf.layers.dense(hidden1, n_hidden2, name="hidden2", activation=tf.nn.relu) logits = tf.layers.dense(hidden2, n_outputs, name="outputs") # In[48]: with tf.name_scope("loss"): xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") loss_summary = tf.summary.scalar('log_loss', loss) # In[49]: learning_rate = 0.01 with tf.name_scope("train"): optimizer = tf.train.GradientDescentOptimizer(learning_rate) training_op = optimizer.minimize(loss) # In[50]: with tf.name_scope("eval"): correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) accuracy_summary = tf.summary.scalar('accuracy', accuracy) # In[51]: init = tf.global_variables_initializer() saver = tf.train.Saver() # 텐서보드 로그를 기록할 디렉토리를 정의합니다: # In[52]: from datetime import datetime def log_dir(prefix=""): now = datetime.utcnow().strftime("%Y%m%d%H%M%S") root_logdir = "tf_logs" if prefix: prefix += "-" name = prefix + "run-" + now return "{}/{}/".format(root_logdir, name) # In[53]: logdir = log_dir("mnist_dnn") # 텐서보드 로그를 작성하는 데 필요한 `FileWriter` 객체를 만듭니다: # In[54]: file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph()) # 잠시만요! 조기 종료를 구현하는 것이 좋겠죠? 이렇게 하려면 검증 세트가 필요합니다. # In[55]: # X_valid = mnist.validation.images # y_valid = mnist.validation.labels # In[56]: m, n = X_train.shape # In[57]: n_epochs = 10001 batch_size = 50 n_batches = int(np.ceil(m / batch_size)) checkpoint_path = "/tmp/my_deep_mnist_model.ckpt" checkpoint_epoch_path = checkpoint_path + ".epoch" final_model_path = "./my_deep_mnist_model" best_loss = np.infty epochs_without_progress = 0 max_epochs_without_progress = 50 with tf.Session() as sess: if os.path.isfile(checkpoint_epoch_path): # 체크포인트 파일이 있으면 모델을 복원하고 에포크 숫자를 로드합니다 with open(checkpoint_epoch_path, "rb") as f: start_epoch = int(f.read()) print("이전 훈련이 중지되었습니다. 에포크 {}에서 시작합니다".format(start_epoch)) saver.restore(sess, checkpoint_path) else: start_epoch = 0 sess.run(init) for epoch in range(start_epoch, n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) accuracy_val, loss_val, accuracy_summary_str, loss_summary_str = sess.run([accuracy, loss, accuracy_summary, loss_summary], feed_dict={X: X_valid, y: y_valid}) file_writer.add_summary(accuracy_summary_str, epoch) file_writer.add_summary(loss_summary_str, epoch) if epoch % 5 == 0: print("에포크:", epoch, "\t검증 세트 정확도: {:.3f}%".format(accuracy_val * 100), "\t손실: {:.5f}".format(loss_val)) saver.save(sess, checkpoint_path) with open(checkpoint_epoch_path, "wb") as f: f.write(b"%d" % (epoch + 1)) if loss_val < best_loss: saver.save(sess, final_model_path) best_loss = loss_val else: epochs_without_progress += 5 if epochs_without_progress > max_epochs_without_progress: print("조기 종료") break # In[58]: os.remove(checkpoint_epoch_path) # In[59]: with tf.Session() as sess: saver.restore(sess, final_model_path) accuracy_val = accuracy.eval(feed_dict={X: X_test, y: y_test}) # In[60]: accuracy_val