import sys
IN_COLAB = "google.colab" in sys.modules
if IN_COLAB:
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install PyVirtualDisplay==3.0
!pip install gymnasium==0.28.1
from pyvirtualdisplay import Display
# Start virtual display
dis = Display(visible=0, size=(400, 400))
dis.start()
M. Fortunato et al., "Noisy Networks for Exploration." arXiv preprint arXiv:1706.10295, 2017.
NoisyNet is an exploration method that learns perturbations of the network weights to drive exploration. The key insight is that a single change to the weight vector can induce a consistent, and potentially very complex, state-dependent change in policy over multiple time steps.
Firstly, let's take a look into a linear layer of a neural network with $p$ inputs and $q$ outputs, represented by
$$ y = wx + b, $$where $x \in \mathbb{R}^p$ is the layer input, $w \in \mathbb{R}^{q \times p}$, and $b \in \mathbb{R}$ the bias.
The corresponding noisy linear layer is defined as:
$$ y = (\mu^w + \sigma^w \odot \epsilon^w) x + \mu^b + \sigma^b \odot \epsilon^b, $$where $\mu^w + \sigma^w \odot \epsilon^w$ and $\mu^b + \sigma^b \odot \epsilon^b$ replace $w$ and $b$ in the first linear layer equation. The parameters $\mu^w \in \mathbb{R}^{q \times p}, \mu^b \in \mathbb{R}^q, \sigma^w \in \mathbb{R}^{q \times p}$ and $\sigma^b \in \mathbb{R}^q$ are learnable, whereas $\epsilon^w \in \mathbb{R}^{q \times p}$ and $\epsilon^b \in \mathbb{R}^q$ are noise random variables which can be generated by one of the following two ways:
In all experiements of the paper, the authors used Factorised Gaussian noise, so we will go for it as well.
import math
import os
from typing import Dict, List, Tuple
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from IPython.display import clear_output
/Users/jinwoo.park/miniforge3/envs/rainbow-is-all-you-need/lib/python3.8/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm
Please see 01.dqn.ipynb for detailed description.
class ReplayBuffer:
"""A simple numpy replay buffer."""
def __init__(self, obs_dim: int, size: int, batch_size: int = 32):
self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.acts_buf = np.zeros([size], dtype=np.float32)
self.rews_buf = np.zeros([size], dtype=np.float32)
self.done_buf = np.zeros(size, dtype=np.float32)
self.max_size, self.batch_size = size, batch_size
self.ptr, self.size, = 0, 0
def store(
self,
obs: np.ndarray,
act: np.ndarray,
rew: float,
next_obs: np.ndarray,
done: bool,
):
self.obs_buf[self.ptr] = obs
self.next_obs_buf[self.ptr] = next_obs
self.acts_buf[self.ptr] = act
self.rews_buf[self.ptr] = rew
self.done_buf[self.ptr] = done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample_batch(self) -> Dict[str, np.ndarray]:
idxs = np.random.choice(self.size, size=self.batch_size, replace=False)
return dict(obs=self.obs_buf[idxs],
next_obs=self.next_obs_buf[idxs],
acts=self.acts_buf[idxs],
rews=self.rews_buf[idxs],
done=self.done_buf[idxs])
def __len__(self) -> int:
return self.size
class NoisyLinear(nn.Module):
"""Noisy linear module for NoisyNet.
Attributes:
in_features (int): input size of linear module
out_features (int): output size of linear module
std_init (float): initial std value
weight_mu (nn.Parameter): mean value weight parameter
weight_sigma (nn.Parameter): std value weight parameter
bias_mu (nn.Parameter): mean value bias parameter
bias_sigma (nn.Parameter): std value bias parameter
"""
def __init__(self, in_features: int, out_features: int, std_init: float = 0.5):
"""Initialization."""
super(NoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.std_init = std_init
self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features))
self.weight_sigma = nn.Parameter(
torch.Tensor(out_features, in_features)
)
self.register_buffer(
"weight_epsilon", torch.Tensor(out_features, in_features)
)
self.bias_mu = nn.Parameter(torch.Tensor(out_features))
self.bias_sigma = nn.Parameter(torch.Tensor(out_features))
self.register_buffer("bias_epsilon", torch.Tensor(out_features))
self.reset_parameters()
self.reset_noise()
def reset_parameters(self):
"""Reset trainable network parameters (factorized gaussian noise)."""
mu_range = 1 / math.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(
self.std_init / math.sqrt(self.in_features)
)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(
self.std_init / math.sqrt(self.out_features)
)
def reset_noise(self):
"""Make new noise."""
epsilon_in = self.scale_noise(self.in_features)
epsilon_out = self.scale_noise(self.out_features)
# outer product
self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
self.bias_epsilon.copy_(epsilon_out)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward method implementation.
We don't use separate statements on train / eval mode.
It doesn't show remarkable difference of performance.
"""
return F.linear(
x,
self.weight_mu + self.weight_sigma * self.weight_epsilon,
self.bias_mu + self.bias_sigma * self.bias_epsilon,
)
@staticmethod
def scale_noise(size: int) -> torch.Tensor:
"""Set scale to make noise (factorized gaussian noise)."""
x = torch.randn(size)
return x.sign().mul(x.abs().sqrt())
We use NoisyLinear for the last two FC layers, and there is a method to reset noise at every step. These are the only differences from the example of 01.dqn.ipynb.
class Network(nn.Module):
def __init__(self, in_dim: int, out_dim: int):
"""Initialization."""
super(Network, self).__init__()
self.feature = nn.Linear(in_dim, 128)
self.noisy_layer1 = NoisyLinear(128, 128)
self.noisy_layer2 = NoisyLinear(128, out_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward method implementation."""
feature = F.relu(self.feature(x))
hidden = F.relu(self.noisy_layer1(feature))
out = self.noisy_layer2(hidden)
return out
def reset_noise(self):
"""Reset all noisy layers."""
self.noisy_layer1.reset_noise()
self.noisy_layer2.reset_noise()
Here is a summary of DQNAgent class.
Method | Note |
---|---|
select_action | select an action from the input state. |
step | take an action and return the response of the env. |
compute_dqn_loss | return dqn loss. |
update_model | update the model by gradient descent. |
target_hard_update | hard update from the local model to the target model. |
train | train the agent during num_frames. |
test | test the agent (1 episode). |
plot | plot the training progresses. |
In the paper, NoisyNet is used as a component of the Dueling Network Architecture, which includes Double-DQN and Prioritized Experience Replay. However, we don't implement them to simplify the tutorial. One thing to note is that NoisyNet is an alternertive to $\epsilon$-greedy method, so all $\epsilon$ related lines are removed. Please check all comments with NoisyNet.
class DQNAgent:
"""DQN Agent interacting with environment.
Attribute:
env (gym.Env): openAI Gym environment
memory (ReplayBuffer): replay memory to store transitions
batch_size (int): batch size for sampling
target_update (int): period for target model's hard update
gamma (float): discount factor
dqn (Network): model to train and select actions
dqn_target (Network): target model to update
optimizer (torch.optim): optimizer for training dqn
transition (list): transition information including
state, action, reward, next_state, done
"""
def __init__(
self,
env: gym.Env,
memory_size: int,
batch_size: int,
target_update: int,
seed: int,
gamma: float = 0.99,
):
"""Initialization.
Args:
env (gym.Env): openAI Gym environment
memory_size (int): length of memory
batch_size (int): batch size for sampling
target_update (int): period for target model's hard update
gamma (float): discount factor
"""
# NoisyNet: All attributes related to epsilon are removed
obs_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
self.env = env
self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)
self.batch_size = batch_size
self.target_update = target_update
self.seed = seed
self.gamma = gamma
# device: cpu / gpu
self.device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu"
)
print(self.device)
# networks: dqn, dqn_target
self.dqn = Network(obs_dim, action_dim).to(self.device)
self.dqn_target = Network(obs_dim, action_dim).to(self.device)
self.dqn_target.load_state_dict(self.dqn.state_dict())
self.dqn_target.eval()
# optimizer
self.optimizer = optim.Adam(self.dqn.parameters())
# transition to store in memory
self.transition = list()
# mode: train / test
self.is_test = False
def select_action(self, state: np.ndarray) -> np.ndarray:
"""Select an action from the input state."""
# NoisyNet: no epsilon greedy action selection
selected_action = self.dqn(
torch.FloatTensor(state).to(self.device)
).argmax()
selected_action = selected_action.detach().cpu().numpy()
if not self.is_test:
self.transition = [state, selected_action]
return selected_action
def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:
"""Take an action and return the response of the env."""
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
if not self.is_test:
self.transition += [reward, next_state, done]
self.memory.store(*self.transition)
return next_state, reward, done
def update_model(self) -> torch.Tensor:
"""Update the model by gradient descent."""
samples = self.memory.sample_batch()
loss = self._compute_dqn_loss(samples)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# NoisyNet: reset noise
self.dqn.reset_noise()
self.dqn_target.reset_noise()
return loss.item()
def train(self, num_frames: int, plotting_interval: int = 200):
"""Train the agent."""
self.is_test = False
state, _ = self.env.reset(seed=self.seed)
update_cnt = 0
losses = []
scores = []
score = 0
for frame_idx in range(1, num_frames + 1):
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward
# NoisyNet: removed decrease of epsilon
# if episode ends
if done:
state, _ = self.env.reset(seed=self.seed)
scores.append(score)
score = 0
# if training is ready
if len(self.memory) >= self.batch_size:
loss = self.update_model()
losses.append(loss)
update_cnt += 1
# if hard update is needed
if update_cnt % self.target_update == 0:
self._target_hard_update()
# plotting
if frame_idx % plotting_interval == 0:
self._plot(frame_idx, scores, losses)
self.env.close()
def test(self, video_folder: str) -> None:
"""Test the agent."""
self.is_test = True
# for recording a video
naive_env = self.env
self.env = gym.wrappers.RecordVideo(self.env, video_folder=video_folder)
state, _ = self.env.reset(seed=self.seed)
done = False
score = 0
while not done:
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward
print("score: ", score)
self.env.close()
# reset
self.env = naive_env
def _compute_dqn_loss(self, samples: Dict[str, np.ndarray]) -> torch.Tensor:
"""Return dqn loss."""
device = self.device # for shortening the following lines
state = torch.FloatTensor(samples["obs"]).to(device)
next_state = torch.FloatTensor(samples["next_obs"]).to(device)
action = torch.LongTensor(samples["acts"].reshape(-1, 1)).to(device)
reward = torch.FloatTensor(samples["rews"].reshape(-1, 1)).to(device)
done = torch.FloatTensor(samples["done"].reshape(-1, 1)).to(device)
# G_t = r + gamma * v(s_{t+1}) if state != Terminal
# = r otherwise
curr_q_value = self.dqn(state).gather(1, action)
next_q_value = self.dqn_target(next_state).max(
dim=1, keepdim=True
)[0].detach()
mask = 1 - done
target = (reward + self.gamma * next_q_value * mask).to(self.device)
# calculate dqn loss
loss = F.smooth_l1_loss(curr_q_value, target)
return loss
def _target_hard_update(self):
"""Hard update: target <- local."""
self.dqn_target.load_state_dict(self.dqn.state_dict())
def _plot(
self,
frame_idx: int,
scores: List[float],
losses: List[float],
):
"""Plot the training progresses."""
clear_output(True)
plt.figure(figsize=(20, 5))
plt.subplot(131)
plt.title('frame %s. score: %s' % (frame_idx, np.mean(scores[-10:])))
plt.plot(scores)
plt.subplot(132)
plt.title('loss')
plt.plot(losses)
plt.show()
You can see the code and configurations of CartPole-v1 from Farama Gymnasium's repository.
# environment
env = gym.make("CartPole-v1", max_episode_steps=200, render_mode="rgb_array")
seed = 777
def seed_torch(seed):
torch.manual_seed(seed)
if torch.backends.cudnn.enabled:
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
np.random.seed(seed)
seed_torch(seed)
# parameters
num_frames = 10000
memory_size = 1000
batch_size = 256
target_update = 150
# train
agent = DQNAgent(env, memory_size, batch_size, target_update, seed)
cpu
agent.train(num_frames)
Run the trained agent (1 episode).
video_folder="videos/noisy_net"
agent.test(video_folder=video_folder)
Moviepy - Building video /Users/jinwoo.park/Repositories/rainbow-is-all-you-need/videos/noisy_net/rl-video-episode-0.mp4. Moviepy - Writing video /Users/jinwoo.park/Repositories/rainbow-is-all-you-need/videos/noisy_net/rl-video-episode-0.mp4
Moviepy - Done ! Moviepy - video ready /Users/jinwoo.park/Repositories/rainbow-is-all-you-need/videos/noisy_net/rl-video-episode-0.mp4 score: 200.0
import base64
import glob
import io
import os
from IPython.display import HTML, display
def ipython_show_video(path: str) -> None:
"""Show a video at `path` within IPython Notebook."""
if not os.path.isfile(path):
raise NameError("Cannot access: {}".format(path))
video = io.open(path, "r+b").read()
encoded = base64.b64encode(video)
display(HTML(
data="""
<video width="320" height="240" alt="test" controls>
<source src="data:video/mp4;base64,{0}" type="video/mp4"/>
</video>
""".format(encoded.decode("ascii"))
))
def show_latest_video(video_folder: str) -> str:
"""Show the most recently recorded video from video folder."""
list_of_files = glob.glob(os.path.join(video_folder, "*.mp4"))
latest_file = max(list_of_files, key=os.path.getctime)
ipython_show_video(latest_file)
return latest_file
latest_file = show_latest_video(video_folder=video_folder)
print("Played:", latest_file)
Played: videos/noisy_net/rl-video-episode-0.mp4