#!/usr/bin/env python # coding: utf-8 # ## 2章のkeras版 # Deep Reinforcement Learning in Actionの2章にでてくる「多椀バンディット問題(Multi-Arm Bandit)」をTorchではなく、Kerasを使って試してみました。 # # 多椀バンディット問題は、強化学習では有名な例題みたいです。 # - http://ibisml.org/archive/ibis2014/ibis2014_bandit.pdf # # 前提条件 # - 複数台のスロットマシンをプレイするギャンブラーのモデル # - 得られる報酬の確率分布は台によって異なる # - なるべく期待値の高い台をプレイしたい # # 問題点 # - ある程度の回数プレイしないと台の良し悪しが分からない # - 報酬の期待値の低い台を何度もプレイすると損 # # ここでは、最高$10まででる10台のスロットルマシーンが題材となっている。 # # - 報酬: $R_k$(k回目の報酬) # - 報酬の期待値: $Q_k(a)$(アクションaに対するk回目までの期待値) # # $$ # Q_k(a) = \frac{R_1 + R_2 + ... + R_k}{k_a} # $$ # # アクションaの最尤値は、以下のように定義される。 # # $$ # a = argmax_a (Q_k(a)) # $$ # ### 前準備 # 最初に必要なライブラリーをインポートしておきます。 # In[1]: import numpy as np from matplotlib import pyplot as plt import random from IPython.display import Image get_ipython().run_line_magic('matplotlib', 'inline') # keras用のパッケージをインポート from keras.models import Sequential from keras.layers import Dense from keras import optimizers # ## 環境のセット # 多椀バンディットのクラスとしてContextBanditを使用します(サポートページからの引用)。 # # In[2]: class ContextBandit: def __init__(self, arms=10): self.arms = arms self.init_distribution(arms) self.update_state() def init_distribution(self, arms): # Num states = Num Arms to keep things simple self.bandit_matrix = np.random.rand(arms,arms) #each row represents a state, each column an arm def reward(self, prob): reward = 0 for i in range(self.arms): if random.random() < prob: reward += 1 return reward def get_state(self): return self.state def update_state(self): self.state = np.random.randint(0,self.arms) def get_reward(self,arm): return self.reward(self.bandit_matrix[self.get_state()][arm]) def choose_arm(self, arm): reward = self.get_reward(arm) self.update_state() return reward # ## モデル作成 # ニューラルネットで使用するモデルは、入力10、隠れ層100、出力10のモデルです。 # - 入力:アクションを10個の配列で表現 # - 出力: 報酬の予測値 # # 処理の流れを図で表すために、Dot記述でグラフを描いてみました(dot記法の練習)。 # In[3]: get_ipython().run_cell_magic('writefile', 'models/ex_5a.dot', 'digraph ex_5 {\n graph [rankdir=LR, charset="UTF-8"];\n // グラフ名はcluster_で始める\n // node\n env_get_state [label="env.get_state()", shape="box", style="rounded,filled"];\n one_hot_encode [label="One-Hot\\nEncode", shape="box", style="rounded,filled"];\n \n subgraph cluster_1 {\n label = "Layer1";\n style= "rounded";\n \n theta1 [label="θ_1:10x100", shape="box", style="rounded,filled"];\n ReLU1 [label="ReLU", shape="box", style="rounded,filled"];\n \n theta1 -> ReLU1 [arrowhead=none];\n };\n\n env_get_state -> one_hot_encode [label="整数", fontname="ipafont-gothic", lblstyle="auto", arrowhead=none];\n one_hot_encode -> theta1 [label="実数[10]", fontname="ipafont-gothic", lblstyle="auto", arrowhead=none];\n\n sample_action [label="Sample Action", shape="box", style="rounded,filled"];\n softmax [label="Softmax", shape="box", style="rounded,filled"];\n\n subgraph cluster_2 {\n label = "Layer2";\n style= "rounded";\n \n ReLU2 [label="ReLU", shape="box", style="rounded,filled"];\n theta2 [label="θ_2:100x10", shape="box", style="rounded,filled"];\n \n ReLU2 -> theta2 [arrowhead=none]; \n };\n \n ReLU1 -> theta2 [arrowhead=none, topath="bend left"]; \n \n sample_action -> softmax [label="確率[10]", fontname="ipafont-gothic", lblstyle="auto", arrowhead=none];\n softmax -> ReLU2 [label="実数[10]", fontname="ipafont-gothic", lblstyle="auto", arrowhead=none];\n \n // node\n env_choose_arm [label="env.choose_arm()", shape="box", style="rounded,filled"];\n loss_function [label="Loss Function", shape="box", style="rounded,filled"];\n Optimizer [shape="box", style="rounded,filled"];\n \n env_choose_arm -> loss_function [label="報酬:整数", fontname="ipafont-gothic", lblstyle="auto", arrowhead=none];\n loss_function -> Optimizer [arrowhead=none];\n \n sample_action -> env_choose_arm [label="整数", fontname="ipafont-gothic", lblstyle="auto", arrowhead=none];\n\n newrank=true;\n {rank=same; ReLU1, theta2}\n {rank=same; env_get_state, sample_action, env_choose_arm}\n {rank=same; one_hot_encode, softmax, loss_function}\n}\n') # In[4]: get_ipython().run_cell_magic('bash', '', 'dot -Tpng -Gdpi=200 models/ex_5a.dot> images/ex_5a.png\n') # In[5]: Image("images/ex_5a.png") # 処理の流れは、以下のようになります。 # - env.get_stateで環境から状態を取得し、One-Hot形式に変換 # - ニューラルネットワークの1層の入力とし、隠れ層100で活性関数としてReLU # - ニューラルネットワークの2層目は出力で、10のスロットルの予測値を得る # - Softmaxで確率分布に変換 # - 確率からアクションをサンプリングする # - 環境からスロットルを実行し、報酬を得る # - ニューラルネットワークの更新 # In[7]: def softmax(av, tau=1.12): n = len(av) probs = np.zeros(n) for i in range(n): softm = ( np.exp(av[i] / tau) / np.sum( np.exp(av[:] / tau) ) ) probs[i] = softm return probs def one_hot(N, pos, val=1): one_hot_vec = np.zeros(N) one_hot_vec[pos] = val return one_hot_vec arms = 10 # N is batch size; D_in is input dimension; # H is hidden dimension; D_out is output dimension. N, D_in, H, D_out = 1, arms, 100, arms model = Sequential() model.add(Dense(H, activation='relu', input_dim=D_in)) model.add(Dense(D_out, activation='relu')) learning_rate = 1e-2 optimizer = optimizers.Adam(lr=learning_rate) model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=['accuracy']) # In[8]: env = ContextBandit(arms) # In[9]: def train(env): epochs = 5000 #one-hot encode current state cur_state = one_hot(arms,env.get_state()).reshape(1, D_in) reward_hist = np.zeros(50) reward_hist[:] = 5 runningMean = np.average(reward_hist) plt.xlabel("Plays") plt.ylabel("Mean Reward") for i in range(epochs): cur_state = one_hot(arms,env.get_state())[np.newaxis,:] y_pred = model.predict(cur_state) #produce reward predictions av_softmax = softmax(y_pred.flatten(), tau=2.0) #turn reward distribution into probability distribution av_softmax /= av_softmax.sum() #make sure total prob adds to 1 choice = np.random.choice(arms, p=av_softmax) #sample an action cur_reward = env.choose_arm(choice) one_hot_reward = y_pred.copy()[0] one_hot_reward[choice] = cur_reward one_hot_reward = one_hot_reward[np.newaxis,:] if i % 50 == 0: runningMean = np.average(reward_hist) reward_hist[:] = 0 plt.scatter(i, runningMean) reward_hist[i % 50] = cur_reward model.fit(cur_state, one_hot_reward, epochs=1, batch_size=1, verbose=0) cur_state = one_hot(arms,env.get_state())[np.newaxis,:] # In[10]: train(env) # In[ ]: