4章のKeras版

Deep Reinforcement Learning in Actionの4章をTorchではなく、Kerasを使って試してみました。

Policy Gradient Methods(方策勾配法)

4章で使用されている更新方法は、Sutton et al., 1999がベースとなっているみたい。

式の導出は、以下のサイトに紹介されています。

3章との違いは、行動価値関数Q(s,a)をactionを行う確率とし、モデルの最後の活性化関数にsoftmaxを使用していること。

倒立台車のように連続した空間の場合、方策ベースの手法が適しているとのことです。

方策勾配の更新式

$s_t$をstate、$a_t$をaction、$p(s_{t+1} | s_t, a_t)$を状態変化を表す確率分布、$r(s_t, a_t)$を即時報酬、$\pi_{\theta}(a_t|t_t)$を方策(policy)に従って行動した場合のstateとactionの列を$\tau$とします。 $$ \tau = (s_1, a_1, ... , s_T, a_T, S_{T+1}) $$ $$ p_{\theta}(\tau) = p(s_1) \prod^T_{t=1}p(s_{t|s_t)+1} | s_t, a_t) \pi_{\theta}(a_t, s_t) $$

累積報酬の期待値$J(\theta)$を最大化するような$\pi_{\theta}$のパラメータ$\theta$を見つけます。

$$ \begin{eqnarray} R(\tau) & \equiv & \sum^T_{t=1} r(s_t, a_t) \\ J(\theta) & \equiv & \mathbb{E}_{\pi_{\theta}}[R(\tau)] \\ & = & \sum_{\tau} R(\tau)p_{\theta}(\tau) \end{eqnarray} $$

とすると以下の方策勾配定理が成り立ちます。 $$ \begin{eqnarray} \nabla J(\theta) & = & \mathbb{E} \left [ \frac{\partial \pi_{\theta}(a|s)}{\partial \theta} \frac{1}{\pi_{\theta}(a|s)} Q^{\pi_{\theta}}(s, a) \right ] \\ & = & \mathbb{E} \left [ \nabla_{\theta} log \, \pi_{\theta} (a|s) \, Q^{\pi_{\theta}}(s, a) \right ] \end{eqnarray} $$

ここで、式の展開に以下の公式を使うのポイントみたいです。

$$ f \frac{d \, log(f)}{dx} = f \frac{df}{dx} \frac{1}{f} = \frac{df}{dx} $$

行動価値関数$Q(s, a)$は、以下のように定義されます。

$$ Q(s, a) = \sum_{s'} P(s'\,|\,s,a)[r(s,a, s') + \gamma V(s')] $$$$ V(s) = \sum_a \pi(a|s) Q(s,a) $$

準備

倒立台車のシミュレーターは、OpenAIのgymからCartPole-v0を使用します。

OpenAI(可視化にopengl, xvfbとJSAnimationを使用)のインストールは、以下のように行いました。

$ apt-get install python-opengl xvfb
$ pip install JSAnimation
$ pip install gym

最初に必要なパッケージをインポートします。

In [1]:
import numpy as np
import random
from IPython.display import Image
from IPython.display import clear_output
from matplotlib import pyplot as plt
%matplotlib inline

# keras用のパッケージをインポート
import keras.layers as layers
from keras.models import Model
from keras.optimizers import Adam
import keras.backend as K

# OpenAI用パッケージをインポート
import gym
Using TensorFlow backend.
In [2]:
import seaborn as sns
import pandas as pd

def showResult(result, title='', x_axis='', y_axis='', showOriginal=False):
    def running_mean(x, N=50):
        cumsum = np.cumsum(np.insert(x, 0, 0)) 
        return (cumsum[N:] - cumsum[:-N]) / float(N)
    
    if showOriginal:
        d = pd.DataFrame(result)
    else:
        d = pd.DataFrame(running_mean(result))
    # loss,をプロット
    sns.set()
    ax = d.plot(title=title)
    ax.set_xlabel(x_axis)
    ax.set_ylabel(y_axis)
    plt.show()

kerasのモデル

kerasで独自の損失関数で計算する例は、あまり見つかりません。

唯一以下のサイトに入力パラメータに状態と割引報酬を渡し、累積報酬の期待値 J(θ) の勾配を求める例がありました。

この方法では、割引報酬と状態から計算されたアクションの確率を使って、custom_loss関数で勾配を計算しています。元の式は、累積報酬最大化なので、先頭にマイナスを付けて損失関数が最小になるようにしています。

In [3]:
D_in, H, D_out = 4, 150, 2
learning_rate = 0.0009

# 例題がleakyReLUを使っているので、合わせる。
from keras.layers.advanced_activations import LeakyReLU
from keras.initializers import glorot_uniform

# 以下のURLを参照
# https://github.com/breeko/Simple-Reinforcement-Learning-with-Tensorflow/blob/master/Part%202%20-%20Policy-based%20Agents%20with%20Keras.ipynb
# 中間層の情報を使って損失関数を定義し、評価用モデルと学習用モデルを使うところもすごい!
def createModel(learning_rate=learning_rate):
    inp = layers.Input(shape=(D_in,), name="input_x")
    adv = layers.Input(shape=(1,), name="advantages")
    x = layers.Dense(H, 
                     activation=LeakyReLU(), 
                     use_bias=False,
                     kernel_initializer=glorot_uniform(seed=42),
                     name="dense_1")(inp)
    out = layers.Dense(D_out, 
                       activation="softmax", 
                       use_bias=False,
                       kernel_initializer=glorot_uniform(seed=42),
                       name="out")(x)
    
    def custom_loss(y_true, y_pred):
        # pred is output from neural network, a is action index
        # r is return (sum of rewards to end of episode), d is discount factor
        return -K.sum(adv * K.log(y_pred)) # element-wise multipliy, then sum

    model_train = Model(inputs=[inp, adv], outputs=out)
    model_train.compile(loss=custom_loss, optimizer=Adam(lr=learning_rate))
    model_predict = Model(inputs=[inp], outputs=out)
    return model_train, model_predict

シミュレーション環境

処理分かりやすくするために、4章の例題にシミュレーション環境SimulationEnvクラスを用意しました。

割引報酬の計算

割引報酬の計算の計算は少しトリッキーです。

以下のように指数関数的に割り引かれるようにします。

In [4]:
r = np.array([2, 2, 2])
lenr = len(r)
gamma = 0.99
d_r = np.power(gamma, np.arange(lenr)) * r
print(d_r)
[2.     1.98   1.9602]
In [5]:
# 倒立台車のシミュレーション環境
class SimulationEnv(object):
    # コンストラクター
    def __init__(self, model_train, model_predict, game, gamma):
        self.model_train = model_train
        self.model_predict = model_predict
        self.game = game
        self.gamma = gamma
    
    # 現在の倒立台車の状態を返す
    def reset(self):
        self.transitions = []
        return game.reset()
    
    # アクションを返す
    def action(self, state):
        act_prob = model_predict.predict(state.reshape(1, D_in))[0]
        # サンプリング
        action = np.random.choice(np.array([0,1]), p=act_prob)
        return action
    
    # 割引報酬(減衰率(gamma)を考慮した)を返す
    def discount_rewards(self, rewards):
        lenr = len(rewards)
        d_rewards = np.power(self.gamma,np.arange(lenr)) * rewards 
        d_rewards = (d_rewards - d_rewards.mean()) / (d_rewards.std() + 1e-07)
        return d_rewards
    
    # 次のステップに進む(状態、アクション、報酬を記録)
    def step(self, state):
        action = self.action(state)
        new_state, reward, done, info = game.step(action)
        self.transitions.append((state, action, reward))
        return (new_state, reward, done)
    
    # エピソードでの状態の遷移(transitions)を元にmodel_trainを更新
    def update(self):
        ep_len = len(self.transitions) # episode length
        preds = np.zeros(ep_len)
        rewards = np.zeros(ep_len)
        states = []
        # model_predictを使ってエピソードの各ステップのaction確率の予測値を計算
        for i in range(ep_len): #for each step in episode
            state, action, reward = self.transitions[i]
            pred = model_predict.predict(state.reshape(1, D_in))[0]
            preds[i] = pred[action]
            rewards[i] = reward
            states.append(state)
        #  ディスカウント報酬とaction確率の予測値でmodel_trainを更新
        discounted_rewards =  self.discount_rewards(rewards)
        states = np.array(states).reshape(ep_len, D_in)
        loss = model_train.train_on_batch([states, discounted_rewards] , preds)
        return loss
    
    # クローズ処理
    def close(self):
        game.close()
        
In [6]:
model_train, model_predict = createModel()
game = gym.make('CartPole-v0')
gamma = 0.99
env = SimulationEnv(model_train, model_predict, game, gamma)

MAX_DUR = 200
MAX_EPISODES = 1000

# 更新状況可視化のため、ゲームの継続時間と損失値を保持
durations = []
losses = []

# 最大エピソード回実行
for episode in range(MAX_EPISODES):
    state = env.reset()
    done = False
    for t in range(MAX_DUR): #エピソードの間繰り返す
        state, reward, done = env.step(state)
        if done:
            durations.append(t)
            break
    loss = env.update()
    losses.append(loss)
env.close()
/usr/lib/sagemath/local/lib/python2.7/site-packages/keras/activations.py:211: UserWarning: Do not pass a layer instance (such as LeakyReLU) as the activation argument of another layer. Instead, advanced activation layers should be used just like any other layer in a model.
  identifier=identifier.__class__.__name__))
In [7]:
showResult(durations, title="Duration", x_axis="episode", y_axis="times")
In [8]:
showResult(losses, title="Losse", x_axis="episode", y_axis="loss")

結果の保存

うまく収束するようになったので、モデルパラメータを保存します。

In [9]:
# ここで学習したモデルパラメータを保存
model_train.save_weights('models/param_train.hdf5')
model_predict.save_weights('models/param_predict.hdf5')

アニメーション

うまく学習できているようなので、倒立台車がどのように動いているのか、アニメーションで確かめてみましょう。

保存したモデルパラメータの読み込みとセットは、以下のように行います。

In [20]:
# 前回学習したモデルパラメータを使ってアニメーションを作成する
model_train, model_predict = createModel()
model_train.load_weights('models/param_train.hdf5')
model_predict.load_weights('models/param_predict.hdf5')
/usr/lib/sagemath/local/lib/python2.7/site-packages/keras/activations.py:211: UserWarning: Do not pass a layer instance (such as LeakyReLU) as the activation argument of another layer. Instead, advanced activation layers should be used just like any other layer in a model.
  identifier=identifier.__class__.__name__))

jupyterノートブックでレンダリング結果をアニメーション

倒立台車のレンダリングフレームをアニメーションとして表示するスクリプトは、以下のサイトを参考にさせていただきました。

In [21]:
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    #plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval=50)
    # gifを保存する場合
    # anim.save("images/CartPole-v0.gif", writer = 'imagemagick')
    display(display_animation(anim, default_mode='loop'))

倒立台車のアニメーションフレームの作成

準備ができたので、学習したモデルでCartPole-v0を動かしてみます。

In [26]:
MAX_DUR = 800
game = gym.make('CartPole-v0')
state = game.reset()
frames = []
for t in range(MAX_DUR):
    # Render into buffer. 
    frames.append(game.render(mode = 'rgb_array'))
    act_prob = model_predict.predict(state.reshape(1, D_in))[0]
    action = np.random.choice(np.array([0,1]), p=act_prob)
    print(action)
    state, reward, done, info = game.step(action)
    if done:
        break
game.render(close=True)
game.close()    
1
0
0
1
0
1
0
1
0
1
0
1
0
1
0
1
1
0
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
1
0
0
1
0
1
0
1
1
0
0
1
1
0
1
0
0
1
0
1
1
0
0
1
1
0
1
0
1
0
1
0
1
0
1
0
1
0
0
1
0
1
1
0
1
0
1
0
1
0
1
0
1
0
0
1
0
1
0
1
1
0
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
1
0
0
1
0
1
1
0
1
0
0
1
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
0
1
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-26-00d16bdfe647> in <module>()
     12     if done:
     13         break
---> 14 game.render(close=True)
     15 game.close()

/usr/lib/sagemath/local/lib/python2.7/site-packages/gym/core.pyc in render(self, mode, **kwargs)
    273 
    274     def render(self, mode='human', **kwargs):
--> 275         return self.env.render(mode, **kwargs)
    276 
    277     def close(self):

TypeError: render() got an unexpected keyword argument 'close'

レンダリングフレームの表示

display_frames_as_gifを使って作成されたレンダリングフレームを表示します。

In [28]:
display_frames_as_gif(frames)


Once Loop Reflect

モデルの制約

avtivate関数をsigmoid, reluと変えてみると、reluはLeakyReLUと変わらない結果となりましたが、 kernel_initializerを指定しないとうまく継続できないという結果になりました。

以下にLeakyReLUをreluを変えた結果を示します。

In [66]:
 D_in, H, D_out = 4, 150, 2
learning_rate = 0.0009

# 例題がleakyReLUを使っているので、合わせる。
from keras.layers.advanced_activations import LeakyReLU
from keras.initializers import glorot_uniform

# 以下のURLを参照
# https://github.com/breeko/Simple-Reinforcement-Learning-with-Tensorflow/blob/master/Part%202%20-%20Policy-based%20Agents%20with%20Keras.ipynb
# 中間層の情報を使って損失関数を定義し、評価用モデルと学習用モデルを使うところもすごい!
def createModelB(learning_rate=learning_rate):
    inp = layers.Input(shape=(D_in,), name="input_x")
    adv = layers.Input(shape=(1,), name="advantages")
    x = layers.Dense(H, 
                     activation="relu", 
                     use_bias=False,
                     kernel_initializer=glorot_uniform(seed=42),
                     name="dense_1")(inp)
    out = layers.Dense(D_out, 
                       activation="softmax", 
                       kernel_initializer=glorot_uniform(seed=42),
                       name="out")(x)
    
    def custom_loss(y_true, y_pred):
        # pred is output from neural network, a is action index
        # r is return (sum of rewards to end of episode), d is discount factor
        return -K.sum(adv * K.log(y_pred)) # element-wise multipliy, then sum

    model_train = Model(inputs=[inp, adv], outputs=out)
    model_train.compile(loss=custom_loss, optimizer=Adam(lr=learning_rate))
    model_predict = Model(inputs=[inp], outputs=out)
    return model_train, model_predict
In [67]:
model_train, model_predict = createModelB()
game = gym.make('CartPole-v0')
gamma = 0.99
env = SimulationEnv(model_train, model_predict, game, gamma)

MAX_DUR = 200
MAX_EPISODES = 1000

# 更新状況可視化のため、ゲームの継続時間と損失値を保持
durations = []
losses = []

# 最大エピソード回実行
for episode in range(MAX_EPISODES):
    state = env.reset()
    done = False
    for t in range(MAX_DUR): #エピソードの間繰り返す
        state, reward, done = env.step(state)
        if done:
            durations.append(t)
            break
    loss = env.update()
    losses.append(loss)
env.close()
In [68]:
showResult(durations, title="Duration", x_axis="episode", y_axis="times")
In [69]:
showResult(losses, title="Losse", x_axis="episode", y_axis="loss")
In [ ]: