import gym
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm, trange
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.distributions import Categorical
%matplotlib inline
env = gym.make('CartPole-v1')
env.seed(1); torch.manual_seed(1);
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
A policy gradient attempts to train an agent without explicitly mapping the value for every state-action pair in an environment by taking small steps and updating the policy based on the reward associated with that step. The agent can receive a reward immediately for an action or the agent can receive the award at a later time such as the end of the episode. We’ll designate the policy function our agent is trying to learn as πθ(a,s), where θ is the parameter vector, s is a particular state, and a is an action.
We'll apply a technique called Monte-Carlo Policy Gradient which means we will have the agent run through an entire episode and then update our policy based on the rewards obtained.
We will use a simple feed forward neural network with one hidden layer of 128 neurons and a dropout of 0.6. We'll use Adam as our optimizer and a learning rate of 0.01. Using dropout will significantly improve the performance of our policy. I encourage you to compare results with and without dropout and experiment with other hyper-parameter values.
#Hyperparameters
learning_rate = 0.01
gamma = 0.99
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.state_space = env.observation_space.shape[0]
self.action_space = env.action_space.n
self.l1 = nn.Linear(self.state_space, 128, bias=False)
self.l2 = nn.Linear(128, self.action_space, bias=False)
self.gamma = gamma
# Episode policy and reward history
self.policy_history = Variable(torch.Tensor())
self.reward_episode = []
# Overall reward and loss history
self.reward_history = []
self.loss_history = []
def forward(self, x):
model = torch.nn.Sequential(
self.l1,
nn.Dropout(p=0.6),
nn.ReLU(),
self.l2,
nn.Softmax(dim=-1)
)
return model(x)
policy = Policy()
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)
The select_action function chooses an action based on our policy probability distribution using the PyTorch distributions package. Our policy returns a probability for each possible action in our action space (move left or move right) as an array of length two such as [0.7, 0.3]. We then choose an action based on these probabilities, record our history, and return our action.
def select_action(state):
#Select an action (0 or 1) by running policy model and choosing based on the probabilities in state
state = torch.from_numpy(state).type(torch.FloatTensor)
state = policy(Variable(state))
c = Categorical(state)
action = c.sample()
# Add log probability of our chosen action to our history
if policy.policy_history.dim() != 0:
policy.policy_history = torch.cat([policy.policy_history, c.log_prob(action)])
else:
policy.policy_history = (c.log_prob(action))
return action
We update our policy by taking a sample of the action value function Qπθ(st,at) by playing through episodes of the game. Qπθ(st,at) is defined as the expected return by taking action a in state s following policy π.
We know that for every step the simulation continues we receive a reward of 1. We can use this to calculate the policy gradient at each time step, where r is the reward for a particular state-action pair. Rather than using the instantaneous reward, r, we instead use a long term reward vt where vt is the discounted sum of all future rewards for the length of the episode. In this way, the longer the episode runs into the future, the greater the reward for a particular state-action pair in the present. vt is then,
vt=N∑k=0γkrt+kwhere γ is the discount factor (0.99). For example, if an episode lasts 5 steps, the reward for each step will be [4.90, 3.94, 2.97, 1.99, 1]. Next we scale our reward vector by substracting the mean from each element and scaling to unit variance by dividing by the standard deviation. This practice is common for machine learning applications and the same operation as Scikit Learn's StandardScaler. It also has the effect of compensating for future uncertainty.
After each episode we apply Monte-Carlo Policy Gradient to improve our policy according to the equation:
Δθt=α∇θlogπθ(st,at)vtWe will then feed our policy history multiplied by our rewards to our optimizer and update the weights of our neural network using stochastic gradent ascent. This should increase the likelihood of actions that got our agent a larger reward.
def update_policy():
R = 0
rewards = []
# Discount future rewards back to the present using gamma
for r in policy.reward_episode[::-1]:
R = r + policy.gamma * R
rewards.insert(0,R)
# Scale rewards
rewards = torch.FloatTensor(rewards)
rewards = (rewards - rewards.mean()) / (rewards.std() + np.finfo(np.float32).eps)
# Calculate loss
loss = (torch.sum(torch.mul(policy.policy_history, Variable(rewards)).mul(-1), -1))
# Update network weights
optimizer.zero_grad()
loss.backward()
optimizer.step()
#Save and intialize episode history counters
policy.loss_history.append(loss.data[0])
policy.reward_history.append(np.sum(policy.reward_episode))
policy.policy_history = Variable(torch.Tensor())
policy.reward_episode= []
This is our main policy training loop. For each step in a training episode, we choose an action, take a step through the environment, and record the resulting new state and reward. We call update_policy() at the end of each episode to feed the episode history to our neural network and improve our policy.
def main(episodes):
running_reward = 10
for episode in range(episodes):
state = env.reset() # Reset environment and record the starting state
done = False
for time in range(1000):
action = select_action(state)
# Step through environment using chosen action
state, reward, done, _ = env.step(action.data[0])
# Save reward
policy.reward_episode.append(reward)
if done:
break
# Used to determine when the environment is solved.
running_reward = (running_reward * 0.99) + (time * 0.01)
update_policy()
if episode % 50 == 0:
print('Episode {}\tLast length: {:5d}\tAverage length: {:.2f}'.format(episode, time, running_reward))
if running_reward > env.spec.reward_threshold:
print("Solved! Running reward is now {} and the last episode runs to {} time steps!".format(running_reward, time))
break
episodes = 1000
main(episodes)
Episode 0 Last length: 8 Average length: 9.98 Episode 50 Last length: 80 Average length: 18.82 Episode 100 Last length: 215 Average length: 47.54 Episode 150 Last length: 433 Average length: 145.24 Episode 200 Last length: 499 Average length: 233.92 Episode 250 Last length: 499 Average length: 332.90 Episode 300 Last length: 499 Average length: 383.54 Episode 350 Last length: 499 Average length: 412.94 Episode 400 Last length: 499 Average length: 446.52 Episode 450 Last length: 227 Average length: 462.03 Episode 500 Last length: 499 Average length: 453.68 Episode 550 Last length: 499 Average length: 468.94 Solved! Running reward is now 475.15748930299014 and the last episode runs to 499 time steps!
Our policy solves the environment prior to reaching 600 episodes.
window = int(episodes/20)
fig, ((ax1), (ax2)) = plt.subplots(2, 1, sharey=True, figsize=[9,9]);
rolling_mean = pd.Series(policy.reward_history).rolling(window).mean()
std = pd.Series(policy.reward_history).rolling(window).std()
ax1.plot(rolling_mean)
ax1.fill_between(range(len(policy.reward_history)),rolling_mean-std, rolling_mean+std, color='orange', alpha=0.2)
ax1.set_title('Episode Length Moving Average ({}-episode window)'.format(window))
ax1.set_xlabel('Episode'); ax1.set_ylabel('Episode Length')
ax2.plot(policy.reward_history)
ax2.set_title('Episode Length')
ax2.set_xlabel('Episode'); ax2.set_ylabel('Episode Length')
fig.tight_layout(pad=2)
plt.show()
#fig.savefig('results.png')