#!/usr/bin/env python
# coding: utf-8
# ## 3章のkeras版
#
#
# ### 前準備
# In[1]:
import numpy as np
import random
from IPython.display import Image
from IPython.display import clear_output
from matplotlib import pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
# keras用のパッケージをインポート
from keras.models import Sequential
from keras.layers import Dense
from keras import optimizers
# ### 環境の引用
# 3章で使われているゲームは、4x4のマス目に
# - A: エージェント
# - W: 壁
# - -: ピット
# - +: ゴール
#
# を配置し、エージェントが壁を避けて最短でゴールに到着させるゲームです。
#
# 3章の図3.1からゲームの画面を引用します。
#
#
#
# ### ゲームの引用
# Deep Reinforcement Learning in ActionのgithubサイトからGridBoard.py, Gridworld.pyを引用し、scriptディレクトリに配置しました。
# In[2]:
from script.Gridworld import *
# ## 強化学習の手順
# バリュー関数は、以下のように定義します。
#
# $$
# V_{\pi}(s) = \Sigma_{i=1}^t w_i R_i = w_1 R_1 + w_2 R_2 + \dots+ w_t R_t
# $$
#
#
# 同様にアクション・バリュー関数 $Q_{\pi}(s, a)$ は、以下ように更新されます。
#
# $$
# Q_{\pi}(S_{t+1}, A_{t}) = Q_{\pi}(S_t, A_t) + \alpha \left [ R_{t+1} + \gamma \, max \, Q(S_{t+1}, a) - Q(S_t, A_t) \right ]
# $$
#
# ここで、各変数は以下のように定義されています。
# - $Q_{\pi}(S_{t+1}, A_{t}) $: 更新されたQ値
# - $Q_{\pi}(S_t, A_t)$: 現在のQ値
# - $\alpha$: ステップサイズ
# - $\gamma$: 減衰ファクタ
# - $max \, Q(S_{t+1}, a) $: すべてのアクションでの最大のQ値
# ### Qネットワークの更新
# Qネットワークの更新は、以下のフローで行います。
# In[3]:
get_ipython().run_cell_magic('bash', '', 'dot -Tpng -Gdpi=200 models/fig_3_3.dot> images/fig_3_3.png\n')
# In[4]:
Image("images/fig_3_3.png")
# ### ゲームの動き
#
# In[5]:
game = Gridworld(size=4, mode='static')
game.display()
# In[6]:
game.makeMove('d')
game.makeMove('d')
game.makeMove('l')
game.display()
# In[7]:
game.board.render_np()
# render_npでは、Wall, Player, Goal, Pitの配置が1で表された4x4の配列がそれぞれ返されます。
#
# 図3.6から配置のイメージを引用します。
#
#
#
# ### ニューラルネットモデル
#
# In[8]:
D_in, H1, H2, D_out = 64, 150, 100, 4
learning_rate = 1e-3
def createModel(learning_rate=learning_rate):
model = Sequential()
model.add(Dense(H1, activation='relu', input_dim=D_in))
model.add(Dense(H2, activation='relu'))
model.add(Dense(D_out))
optimizer = optimizers.Adam(lr=learning_rate)
model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=['mae'])
return model
action_set = {
0: 'u',
1: 'd',
2: 'l',
3: 'r',
}
# In[9]:
# ゲームのシニュレーションをする環境を準備
class SimulationEnv(object):
def __init__(self, model, game, epsilon, depsilon=0.05, gamma=0.9):
self.model = model
self.game = game
self.epsilon = epsilon
self.depsilon = depsilon
self.gamma = gamma
# おそらくデータが極端にスパースになるのを防ぐためではないかと推測
self.noise_rate = 1.0/10.0
# 出力層の次元
self.D_out = int(self.model.weights[-1].shape[0])
# 現在のゲームの状態を返す
def state(self):
state = self.game.board.render_np().reshape(1,64) + np.random.rand(1,64)*self.noise_rate
return state
# アクション(action, Qvalue)を返す
def action(self, state):
# Q値の確率
Qvalue = self.model.predict(state)[0]
if random.random() < self.epsilon :
action = np.random.randint(0,self.D_out)
else:
action = np.argmax(Qvalue)
return (action, Qvalue)
# 次のステップに進み、(newState, reward, done)のタプルを返す
def step(self, action):
# ゲームのPersonを移動
self.game.makeMove(action_set[action])
reward = self.game.reward()
done = reward != -1
newState = self.state()
return (newState, reward, done)
# ニューラルネットを更新する
def update(self, state, action, newState, reward, Qprob, done):
newQprob = self.model.predict(newState)[0]
maxQprob = np.max(newQprob)
y = np.zeros((1, 4))
y[:] = Qprob[:]
if not done:
update = (reward + (self.gamma * maxQprob))
else:
update = reward
y[0][action] = update
#ret = self.model.fit(state.reshape(1, 64), y, epochs=1, batch_size=1, verbose=0)
ret = self.model.train_on_batch(state.reshape(1, 64), y)
return ret
# In[31]:
# モデルを検証するためのメソッド
def testModel(model, mode='static', display=True):
def displayWithMsg(env, msg):
print(msg)
print(game.display())
epsilon = 0.05
max_moves = 50
win = False
i = 0
game = Gridworld(mode=mode)
env = SimulationEnv(model, game, epsilon)
if display:
displayWithMsg(env, 'initial State:')
status = 1
state = env.state()
#while game still in progress
while(status == 1):
action, Qprob = env.action(state)
if display:
displayWithMsg(env, 'Move #: %s; Taking action: %s' % (i, action_set[action]))
newState, reward, done = env.step(action)
state = newState
if reward == 10:
status = 0
win = True
if display: print("You won! Reward: {}".format(reward,))
elif reward == -10:
status = 0
if display: print("Game lost; stepped into the pit. Penalty: {}".format(reward,))
i += 1
if i > max_moves:
print("Game lost; too many moves.")
break
return win
# ## スタティック版
# In[29]:
epsilon = 1.0
epochs = 1000
losses = []
modelS = createModel()
for i in range(epochs):
game = Gridworld(mode='static')
env = SimulationEnv(modelS, game, epsilon)
done = False
state = env.state()
while not done:
action, Qprob = env.action(state)
newState, reward, done = env.step(action)
loss = env.update(state, action, newState, reward, Qprob, done)
state = newState
losses.append(loss)
if epsilon > 0.1:
epsilon -= (1.0/epochs)
# In[30]:
import seaborn as sns
import pandas as pd
d = pd.DataFrame(losses)
# loss,をプロット
sns.set()
d.plot()
plt.show()
# In[31]:
testModel(modelS, 'static')
# ## ランダム版
# In[33]:
epsilon = 1.0
epochs = 1000
losses = []
modelR = createModel()
for i in range(epochs):
game = Gridworld(mode='random')
env = SimulationEnv(modelR, game, epsilon)
done = False
state = env.state()
while not done:
action, Qprob = env.action(state)
newState, reward, done = env.step(action)
loss = env.update(state, action, newState, reward, Qprob, done)
state = newState
losses.append(loss)
if epsilon > 0.1:
epsilon -= (1.0/epochs)
# In[34]:
d = pd.DataFrame(losses)
# loss,をプロット
sns.set()
d.plot()
plt.show()
# ## 経験再現版
# ### ターゲット・ネットワークの導入
# In[11]:
get_ipython().run_cell_magic('writefile', 'models/fig3_10.dot', 'digraph fig3_10 {\n // 書式を設定\n graph [rankdir=LR, charset="UTF-8"];\n node [shape="box", style="rounded,filled"];\n edge [fontname="ipafont-gothic", lblstyle="auto"];\n\n Initial_Game_State_St [label="初期ゲーム状態S_t"];\n Q_network [label="Qネットワーク"];\n Q_values [label="Q値"];\n Action_a [label="アクションa"];\n Game_state_St1 [label="ゲーム状態S_t+1"];\n Q_hat_network [label="Qターゲットネットワーク"];\n Q_hat_values [label="Q^値"];\n\n Initial_Game_State_St -> Q_network [label="入力"];\n Q_network -> Q_values [label="予測"];\n Q_values-> Action_a [label="使用されaを得る"];\n Action_a -> Game_state_St1 [label="新しい状態を生成"];\n Game_state_St1 -> Q_network [label="入力"];\n Game_state_St1 -> Q_hat_network;\n Q_hat_network -> Q_hat_values [label="予測"];\n Q_hat_values\u3000-> Q_network [label="学習に使用"];\n Q_hat_network\u3000-> Q_network [label="パラメータを定期的にコピー"]; \n }\n')
# In[12]:
get_ipython().run_cell_magic('bash', '', 'dot -Tpng -Gdpi=200 models/fig3_10.dot> images/fig3_10.png\n')
# In[13]:
Image("images/fig3_10.png")
# In[67]:
from collections import deque
class TargetMemoryEnv(SimulationEnv):
def __init__(self, model, targetModel, game, epsilon, gamma=0.9,
memorySize = 1200, batchSize = 250, targetUpdateCount=50):
super(TargetMemoryEnv, self).__init__(model, game, epsilon, gamma=gamma)
self.memorySize = memorySize
self.batchSize = batchSize
self.targetUpdateCount = targetUpdateCount
self.memory = deque(maxlen=self.memorySize)
# このターゲットモデルの導入でDeepMindは劇的な収束改善を得た
self.targetModel = targetModel
self.replayCount = 0
def remember(self, state, action, reward, newState, done):
self.memory.append([state, action, reward, newState, done])
def replay(self, losses):
if len(self.memory) < self.memorySize:
return
self.replayCount += 1
samples = random.sample(self.memory, self.batchSize)
states = []
targets = []
for sample in samples:
state, action, reward, newState, done = sample
target = self.targetModel.predict(state)
if done:
target[0][action] = reward
else:
newQprob = np.max(self.targetModel.predict(newState))
target[0][action] = reward + self.gamma*newQprob
states.append(state)
targets.append(target)
# リストからnumpyのarrayで、配列並びを(self.batchSize, 64)に変更
states = np.array(states).reshape(self.batchSize, 64)
targets = np.array(targets).reshape(self.batchSize, 4)
ret = self.model.train_on_batch(states, targets)
losses.append(ret[0])
print('\r%d, %f, %f' % (i, ret[0], ret[1]))
clear_output(wait=True)
def targetUpdate(self):
if self.replayCount > 0 and (self.replayCount % self.targetUpdateCount) == 0:
print("Updated")
weights = self.model.get_weights()
targetWeights = self.targetModel.get_weights()
for i in range(len(targetWeights)):
targetWeights[i] = weights[i]
self.targetModel.set_weights(targetWeights)
# In[77]:
epsilon = 1.0
epochs = 5000
losses = []
changeCount = 50
max_moves = 100
# モデルと環境を生成
modelT = createModel()
targetModel = createModel()
game = Gridworld(mode='random')
env = TargetMemoryEnv(modelT, targetModel, game, epsilon)
# エポック数試行する
for i in range(epochs):
env.game = Gridworld(mode='random')
env.epsilon = epsilon
done = False
mov = 0
state = env.state()
while not done:
action, Qprob = env.action(state)
newState, reward, done = env.step(action)
mov += 1
env.remember(state, action, reward, newState, done)
if done or mov > max_moves:
done = True
env.replay(losses)
env.targetUpdate()
state = newState
if epsilon > 0.1:
epsilon -= (1.0/epochs)
# In[78]:
import seaborn as sns
import pandas as pd
d = pd.DataFrame(losses)
# loss,をプロット
sns.set()
d.plot()
plt.show()
# In[82]:
testModel(modelT, 'random')
# In[83]:
max_games = 1000
wins = 0
for i in range(max_games):
win = testModel(modelT, 'random', display=False)
clear_output(wait=True)
if win:
wins += 1
win_perc = float(wins) / float(max_games)
print("Games played: {0}, # of wins: {1}".format(max_games,wins))
print("Win percentage: {}".format(win_perc))
# In[ ]: