2016/09/10 機械学習 名古屋 第6回勉強会
以下の環境を前提とします。
※ TensorFlow 0.6(以前)は未対応。
インストールの詳細省略。
インストールが成功していれば、Python のインタラクティブシェル(もしくは ipython, Jupyter 等)で↓以下のようにすれば利用開始。
import tensorflow as tf
※ 今回は TensorBoard は不使用。
※ RNN の最初のサンプル構築・実験に最適。
http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz をダウンロード・展開。
(その中の data/ptb.train.txt
, data/ptb.valid.txt
, data/ptb.test.txt
のみ使用)
※ 必要なデータのみ抽出しアーカイブしたファイルを用意 → https://github.com/antimon2/MLN_201609/raw/tf_rnn_dev/simple-examples.tgz
※ あと今回は自動ダウンロード・展開の機能は提供なし。各自手作業でお願いしますm(_ _)m
def _read_words(filename):
with tf.gfile.GFile(filename, "r") as f:
return f.read().replace("\n", "<eos>").split()
※ <eos>
は "End of Sentence" の意味。改行を文章の区切と見なしそのマークを入れています。
def _build_vocab(filename):
import collections
data = _read_words(filename)
counter = collections.Counter(data)
count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0]))
words, _ = list(zip(*count_pairs))
word_to_id = dict(zip(words, range(len(words))))
return word_to_id
※ 概要:訓練データに含まれる全ての単語を高頻度順に番号付けした辞書を生成。
def _file_to_word_ids(filename, word_to_id):
data = _read_words(filename)
return [word_to_id[word] for word in data]
※ 概要:テキストファイルを単語ごとに分割→単語IDのリストに変換。
def ptb_raw_data(data_path=None):
"""Load PTB raw data from data directory "data_path"."""
import os
train_path = os.path.join(data_path, "ptb.train.txt")
valid_path = os.path.join(data_path, "ptb.valid.txt")
test_path = os.path.join(data_path, "ptb.test.txt")
word_to_id = _build_vocab(train_path)
train_data = _file_to_word_ids(train_path, word_to_id)
valid_data = _file_to_word_ids(valid_path, word_to_id)
test_data = _file_to_word_ids(test_path, word_to_id)
vocabulary = len(word_to_id)
return train_data, valid_data, test_data, vocabulary
※ 概要:訓練データ・検証データ・評価データをそれぞれ読込。
def ptb_iterator(raw_data, batch_size, num_steps):
"""Iterate on the raw PTB data."""
import numpy as np
raw_data = np.array(raw_data, dtype=np.int32)
data_len = len(raw_data)
batch_len = data_len // batch_size
data = np.zeros([batch_size, batch_len], dtype=np.int32)
for i in range(batch_size):
data[i] = raw_data[batch_len * i:batch_len * (i + 1)]
epoch_size = (batch_len - 1) // num_steps
if epoch_size == 0:
raise ValueError("epoch_size == 0, decrease batch_size or num_steps")
for i in range(epoch_size):
x = data[:, i*num_steps:(i+1)*num_steps]
y = data[:, i*num_steps+1:(i+1)*num_steps+1]
yield (x, y)
※ 概要:与えられたデータを batch_size
×num_steps
のミニバッチデータにして送出するイテレータ(generator)を返す。
class SampleTinyConfig(object):
"""Tiny config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 10
hidden_size = 50
max_epoch = 1
max_max_epoch = 3
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
def ptbmodel_init_step1(config):
"""Prepare Placeholders."""
_input_data = tf.placeholder(tf.int32, [config.batch_size, config.num_steps])
_targets = tf.placeholder(tf.int32, [config.batch_size, config.num_steps])
return (_input_data, _targets)
def ptbmodel_init_step2(config, is_training):
"""Construct LSTM cell(s)."""
batch_size = config.batch_size
size = config.hidden_size
vocab_size = config.vocab_size
lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(size, forget_bias=0.0)
if is_training and config.keep_prob < 1:
lstm_cell = tf.nn.rnn_cell.DropoutWrapper(
lstm_cell, output_keep_prob=config.keep_prob)
cell = tf.nn.rnn_cell.MultiRNNCell([lstm_cell] * config.num_layers)
_initial_state = cell.zero_state(batch_size, tf.float32)
return (cell, _initial_state)
※ 複数LSTMスタック(tf.nn.rnn_cell.MultiRNNCell
)利用。
↑第1層の出力が第2層の入力になる。
※ ここでは簡単のため、forget_bias=0.0
(Forget Gate を無効)にしている。
def ptbmodel_init_step3(config, _input_data, is_training):
"""Convert Input Data."""
size = config.hidden_size
vocab_size = config.vocab_size
with tf.device("/cpu:0"):
embedding = tf.get_variable("embedding", [vocab_size, size])
inputs = tf.nn.embedding_lookup(embedding, _input_data)
if is_training and config.keep_prob < 1:
inputs = tf.nn.dropout(inputs, config.keep_prob)
return inputs
※ 「単語のベクトル表現」利用(前回資料参照)
def ptbmodel_init_step4(config, cell, inputs, _initial_state):
"""Calculate Logits and Final State."""
batch_size = config.batch_size
size = config.hidden_size
vocab_size = config.vocab_size
outputs = []
state = _initial_state
with tf.variable_scope("RNN"):
for time_step in range(config.num_steps):
if time_step > 0: tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(inputs[:, time_step, :], state)
outputs.append(cell_output)
output = tf.reshape(tf.concat(1, outputs), [-1, size])
softmax_w = tf.get_variable("softmax_w", [size, vocab_size])
softmax_b = tf.get_variable("softmax_b", [vocab_size])
logits = tf.matmul(output, softmax_w) + softmax_b
probs = tf.nn.softmax(logits)
return (logits, probs, state)
def ptbmodel_init_step5(config, logits, _targets):
"""Calculate Cost."""
loss = tf.nn.seq2seq.sequence_loss_by_example(
[logits],
[tf.reshape(_targets, [-1])],
[tf.ones([config.batch_size * config.num_steps])])
cost = tf.reduce_sum(loss) / config.batch_size
return cost
def ptbmodel_init_step6(config, cost):
"""Define Training Operation."""
_lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars),
config.max_grad_norm)
optimizer = tf.train.GradientDescentOptimizer(_lr)
_train_op = optimizer.apply_gradients(zip(grads, tvars))
return (_lr, _train_op)
class PTBModel(object):
"""The PTB model."""
def __init__(self, is_training, config):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
size = config.hidden_size
vocab_size = config.vocab_size
_input_data, _targets = ptbmodel_init_step1(config)
self.input_data = _input_data
self.targets = _targets
cell, _initial_state = ptbmodel_init_step2(config, is_training)
self.cell = cell
self.initial_state = _initial_state
inputs = ptbmodel_init_step3(config, _input_data, is_training)
logits, probs, _final_state = ptbmodel_init_step4(
config, cell, inputs, _initial_state)
self.logits = logits
self.probs = probs
self.final_state = _final_state
cost = ptbmodel_init_step5(config, logits, _targets)
self.cost = cost
if is_training:
_lr, _train_op = ptbmodel_init_step6(config, cost)
self.lr = _lr
self.train_op = _train_op
def assign_lr(self, session, lr_value):
session.run(tf.assign(self.lr, lr_value))
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import os
import numpy as np
import tensorflow as tf
def run_epoch(session, model, data, eval_op, verbose=False):
"""Runs the model on the given data."""
epoch_size = ((len(data) // model.batch_size) - 1) // model.num_steps
start_time = time.time()
costs = 0.0
iters = 0
# state = model.initial_state.eval()
state = session.run(model.initial_state)
for step, (x, y) in enumerate(ptb_iterator(data, model.batch_size,
model.num_steps)):
cost, state, _ = session.run([model.cost, model.final_state, eval_op],
{model.input_data: x,
model.targets: y,
model.initial_state: state})
costs += cost
iters += model.num_steps
if verbose and step % (epoch_size // 10) == 10:
print("%.3f perplexity: %.3f speed: %.0f wps" %
(step * 1.0 / epoch_size, np.exp(costs / iters),
iters * model.batch_size / (time.time() - start_time)))
return np.exp(costs / iters)
data_path = 'simple-examples/data/'
train_data, valid_data, test_data, _ = ptb_raw_data(data_path)
config = SampleTinyConfig()
eval_config = SampleTinyConfig()
eval_config.batch_size = 1
eval_config.num_steps = 1
# with tf.Graph().as_default():
initializer = tf.random_uniform_initializer(-config.init_scale,
config.init_scale)
with tf.variable_scope("model", reuse=None, initializer=initializer):
m = PTBModel(is_training=True, config=config)
with tf.variable_scope("model", reuse=True, initializer=initializer):
mvalid = PTBModel(is_training=False, config=config)
mtest = PTBModel(is_training=False, config=eval_config)
init = tf.initialize_all_variables()
※ Tensorflow v0.9以上 だと、WARNING が出力されますが、正常に動作します。
sess = tf.Session()
sess.run(init)
for i in range(config.max_max_epoch):
lr_decay = config.lr_decay ** max(i - config.max_epoch, 0.0)
m.assign_lr(sess, config.learning_rate * lr_decay)
print("Epoch: %d Learning rate: %.3f" % (i + 1, sess.run(m.lr)))
train_perplexity = run_epoch(sess, m, train_data, m.train_op,
verbose=True)
print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity))
valid_perplexity = run_epoch(sess, mvalid, valid_data, tf.no_op())
print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity))
test_perplexity = run_epoch(sess, mtest, test_data, tf.no_op())
print("Test Perplexity: %.3f" % test_perplexity)
※ 30分くらいかかります。
※ 最終的に Test Perplexity の値が 155 前後で表示されるはずです。
def sample(model, sess, words, vocab, num=200, prime=['the'], sampling_type=1):
state = sess.run(model.cell.zero_state(1, tf.float32))
for word in prime[:-1]:
x = np.array([[vocab[word]]])
feed = {model.input_data: x, model.initial_state:state}
state = sess.run(model.final_state, feed)
def weighted_pick(weights):
t = np.cumsum(weights)
s = np.sum(weights)
return(int(np.searchsorted(t, np.random.rand(1)*s)))
ret = list(prime)
word = prime[-1]
for n in range(num):
x = np.array([[vocab[word]]])
feed = {model.input_data: x, model.initial_state:state}
probs, state = sess.run([model.probs, model.final_state], feed)
p = probs[0]
if sampling_type == 0:
sample = np.argmax(p)
else:
sample = weighted_pick(p)
pred = words[sample]
if pred == '<eos>':
break
ret.append(pred)
word = pred
return ret
word_to_id = _build_vocab(os.path.join(data_path, "ptb.train.txt"))
words = sorted(word_to_id.keys(), key=lambda x: word_to_id[x])
res = sample(mtest, sess, words, word_to_id, prime=['the', 'american'])
' '.join(res)
res = sample(mtest, sess, words, word_to_id, prime=np.random.choice(words, 3, replace=False))
' '.join(res)
num_step
, max_epoch
, max_max_epoch
等)hidden_size
)init_scale
, learning_rate
, max_grad_norm
等)ptbmodel_init_step2()
関数内、tf.nn.rnn_cell.BasicLSTMCell()
への引数 forget_bias=~
に適切な数値(>0.0)を設定する)