아래 링크를 통해 이 노트북을 주피터 노트북 뷰어(nbviewer.jupyter.org)로 보거나 구글 코랩(colab.research.google.com)에서 실행할 수 있습니다.
주피터 노트북 뷰어로 보기 | 구글 코랩(Colab)에서 실행하기 |
from IPython.display import Image
Image(url='https://git.io/JtTQo', width=700)
Image(url='https://git.io/JtTQi', width=700)
Image(url='https://git.io/JtTQP', width=700)
# 할인 계수에 대하여
Image(url='https://git.io/Jtkcl', width=700)
Image(url='https://git.io/Jtkc4', width=700)
Image(url='https://git.io/JtkcB', width=800)
Image(url='https://git.io/Jtkc0', width=800)
# 스크립트: gridworld_env.py
import numpy as np
from gym.envs.toy_text import discrete
from collections import defaultdict
import time
import pickle
import os
from gym.envs.classic_control import rendering
CELL_SIZE = 100
MARGIN = 10
def get_coords(row, col, loc='center'):
xc = (col + 1.5) * CELL_SIZE
yc = (row + 1.5) * CELL_SIZE
if loc == 'center':
return xc, yc
elif loc == 'interior_corners':
half_size = CELL_SIZE//2 - MARGIN
xl, xr = xc - half_size, xc + half_size
yt, yb = xc - half_size, xc + half_size
return [(xl, yt), (xr, yt), (xr, yb), (xl, yb)]
elif loc == 'interior_triangle':
x1, y1 = xc, yc + CELL_SIZE//3
x2, y2 = xc + CELL_SIZE//3, yc - CELL_SIZE//3
x3, y3 = xc - CELL_SIZE//3, yc - CELL_SIZE//3
return [(x1, y1), (x2, y2), (x3, y3)]
def draw_object(coords_list):
if len(coords_list) == 1: # -> 원
obj = rendering.make_circle(int(0.45*CELL_SIZE))
obj_transform = rendering.Transform()
obj.add_attr(obj_transform)
obj_transform.set_translation(*coords_list[0])
obj.set_color(0.2, 0.2, 0.2) # -> 검정
elif len(coords_list) == 3: # -> 삼각형
obj = rendering.FilledPolygon(coords_list)
obj.set_color(0.9, 0.6, 0.2) # -> 노랑
elif len(coords_list) > 3: # -> 다각형
obj = rendering.FilledPolygon(coords_list)
obj.set_color(0.4, 0.4, 0.8) # -> 파랑
return obj
class GridWorldEnv(discrete.DiscreteEnv):
def __init__(self, num_rows=4, num_cols=6, delay=0.05):
self.num_rows = num_rows
self.num_cols = num_cols
self.delay = delay
move_up = lambda row, col: (max(row - 1, 0), col)
move_down = lambda row, col: (min(row + 1, num_rows - 1), col)
move_left = lambda row, col: (row, max(col - 1, 0))
move_right = lambda row, col: (row, min(col + 1, num_cols - 1))
self.action_defs = {0: move_up, 1: move_right,
2: move_down, 3: move_left}
# 상태와 행동 개수
nS = num_cols * num_rows
nA = len(self.action_defs)
self.grid2state_dict = {(s // num_cols, s % num_cols): s
for s in range(nS)}
self.state2grid_dict = {s: (s // num_cols, s % num_cols)
for s in range(nS)}
# 골드 상태
gold_cell = (num_rows // 2, num_cols - 2)
# 함정 상태
trap_cells = [((gold_cell[0] + 1), gold_cell[1]),
(gold_cell[0], gold_cell[1] - 1),
((gold_cell[0] - 1), gold_cell[1])]
gold_state = self.grid2state_dict[gold_cell]
trap_states = [self.grid2state_dict[(r, c)]
for (r, c) in trap_cells]
self.terminal_states = [gold_state] + trap_states
print(self.terminal_states)
# 전이 확률 만들기
P = defaultdict(dict)
for s in range(nS):
row, col = self.state2grid_dict[s]
P[s] = defaultdict(list)
for a in range(nA):
action = self.action_defs[a]
next_s = self.grid2state_dict[action(row, col)]
# 종료 상태
if self.is_terminal(next_s):
r = (1.0 if next_s == self.terminal_states[0]
else -1.0)
else:
r = 0.0
if self.is_terminal(s):
done = True
next_s = s
else:
done = False
P[s][a] = [(1.0, next_s, r, done)]
# 초기 상태 배치
isd = np.zeros(nS)
isd[0] = 1.0
super(GridWorldEnv, self).__init__(nS, nA, P, isd)
self.viewer = None
self._build_display(gold_cell, trap_cells)
def is_terminal(self, state):
return state in self.terminal_states
def _build_display(self, gold_cell, trap_cells):
screen_width = (self.num_cols + 2) * CELL_SIZE
screen_height = (self.num_rows + 2) * CELL_SIZE
self.viewer = rendering.Viewer(screen_width,
screen_height)
all_objects = []
# 경계 위치 좌표
bp_list = [
(CELL_SIZE - MARGIN, CELL_SIZE - MARGIN),
(screen_width - CELL_SIZE + MARGIN, CELL_SIZE - MARGIN),
(screen_width - CELL_SIZE + MARGIN,
screen_height - CELL_SIZE + MARGIN),
(CELL_SIZE - MARGIN, screen_height - CELL_SIZE + MARGIN)
]
border = rendering.PolyLine(bp_list, True)
border.set_linewidth(5)
all_objects.append(border)
# 수직선
for col in range(self.num_cols + 1):
x1, y1 = (col + 1) * CELL_SIZE, CELL_SIZE
x2, y2 = (col + 1) * CELL_SIZE, \
(self.num_rows + 1) * CELL_SIZE
line = rendering.PolyLine([(x1, y1), (x2, y2)], False)
all_objects.append(line)
# 수평선
for row in range(self.num_rows + 1):
x1, y1 = CELL_SIZE, (row + 1) * CELL_SIZE
x2, y2 = (self.num_cols + 1) * CELL_SIZE, \
(row + 1) * CELL_SIZE
line = rendering.PolyLine([(x1, y1), (x2, y2)], False)
all_objects.append(line)
# 함정: --> 원
for cell in trap_cells:
trap_coords = get_coords(*cell, loc='center')
all_objects.append(draw_object([trap_coords]))
# 골드: --> 삼각형
gold_coords = get_coords(*gold_cell,
loc='interior_triangle')
all_objects.append(draw_object(gold_coords))
# 에이전트 --> 사각형 또는 로봇
if (os.path.exists('robot-coordinates.pkl') and CELL_SIZE == 100):
agent_coords = pickle.load(
open('robot-coordinates.pkl', 'rb'))
starting_coords = get_coords(0, 0, loc='center')
agent_coords += np.array(starting_coords)
else:
agent_coords = get_coords(0, 0, loc='interior_corners')
agent = draw_object(agent_coords)
self.agent_trans = rendering.Transform()
agent.add_attr(self.agent_trans)
all_objects.append(agent)
for obj in all_objects:
self.viewer.add_geom(obj)
def render(self, mode='human', done=False):
if done:
sleep_time = 1
else:
sleep_time = self.delay
x_coord = self.s % self.num_cols
y_coord = self.s // self.num_cols
x_coord = (x_coord + 0) * CELL_SIZE
y_coord = (y_coord + 0) * CELL_SIZE
self.agent_trans.set_translation(x_coord, y_coord)
rend = self.viewer.render(
return_rgb_array=(mode == 'rgb_array'))
time.sleep(sleep_time)
return rend
def close(self):
if self.viewer:
self.viewer.close()
self.viewer = None
if __name__ == '__main__':
env = GridWorldEnv(5, 6)
for i in range(1):
s = env.reset()
env.render(mode='human', done=False)
while True:
action = np.random.choice(env.nA)
res = env.step(action)
print('Action ', env.s, action, ' -> ', res)
env.render(mode='human', done=res[2])
if res[2]:
break
env.close()
Image(url='https://bit.ly/34MpM2p', width=600)
# 스크립트: agent.py
from collections import defaultdict
import numpy as np
class Agent(object):
def __init__(
self, env,
learning_rate=0.01,
discount_factor=0.9,
epsilon_greedy=0.9,
epsilon_min=0.1,
epsilon_decay=0.95):
self.env = env
self.lr = learning_rate
self.gamma = discount_factor
self.epsilon = epsilon_greedy
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_decay
# q_table 정의
self.q_table = defaultdict(lambda: np.zeros(self.env.nA))
def choose_action(self, state):
if np.random.uniform() < self.epsilon:
action = np.random.choice(self.env.nA)
else:
q_vals = self.q_table[state]
perm_actions = np.random.permutation(self.env.nA)
q_vals = [q_vals[a] for a in perm_actions]
perm_q_argmax = np.argmax(q_vals)
action = perm_actions[perm_q_argmax]
return action
def _learn(self, transition):
s, a, r, next_s, done = transition
q_val = self.q_table[s][a]
if done:
q_target = r
else:
q_target = r + self.gamma*np.max(self.q_table[next_s])
# q_table 업데이트
self.q_table[s][a] += self.lr * (q_target - q_val)
# epislon 조정
self._adjust_epsilon()
def _adjust_epsilon(self):
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# 스크립트: qlearning.py
from gridworld_env import GridWorldEnv
from agent import Agent
from collections import namedtuple
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(1)
Transition = namedtuple(
'Transition', ('state', 'action', 'reward', 'next_state', 'done'))
def run_qlearning(agent, env, num_episodes=50):
history = []
for episode in range(num_episodes):
state = env.reset()
env.render(mode='human')
final_reward, n_moves = 0.0, 0
while True:
action = agent.choose_action(state)
next_s, reward, done, _ = env.step(action)
agent._learn(Transition(state, action, reward,
next_s, done))
env.render(mode='human', done=done)
state = next_s
n_moves += 1
if done:
break
final_reward = reward
history.append((n_moves, final_reward))
print('에피소드 %d: 보상 %.1f #이동 %d'
% (episode, final_reward, n_moves))
return history
def plot_learning_history(history):
fig = plt.figure(1, figsize=(14, 10))
ax = fig.add_subplot(2, 1, 1)
episodes = np.arange(len(history))
moves = np.array([h[0] for h in history])
plt.plot(episodes, moves, lw=4,
marker="o", markersize=10)
ax.tick_params(axis='both', which='major', labelsize=15)
plt.xlabel('Episodes', size=20)
plt.ylabel('# moves', size=20)
ax = fig.add_subplot(2, 1, 2)
rewards = np.array([h[1] for h in history])
plt.step(episodes, rewards, lw=4)
ax.tick_params(axis='both', which='major', labelsize=15)
plt.xlabel('Episodes', size=20)
plt.ylabel('Final rewards', size=20)
plt.savefig('q-learning-history.png', dpi=300)
plt.show()
if __name__ == '__main__':
env = GridWorldEnv(num_rows=5, num_cols=6)
agent = Agent(env)
history = run_qlearning(agent, env)
env.close()
plot_learning_history(history)
Image(url='https://bit.ly/2TBkxR3', width=800)
Image(url='https://bit.ly/3yUaFlv', width=800)
Image(url='https://bit.ly/34CELfz', width=800)
Image(url='https://bit.ly/34Fkwhb', width=800)
# 스크립트: carpole/main.py
import gym
import numpy as np
import tensorflow as tf
import random
import matplotlib.pyplot as plt
from collections import namedtuple
from collections import deque
np.random.seed(1)
tf.random.set_seed(1)
Transition = namedtuple(
'Transition', ('state', 'action', 'reward',
'next_state', 'done'))
class DQNAgent:
def __init__(
self, env, discount_factor=0.95,
epsilon_greedy=1.0, epsilon_min=0.01,
epsilon_decay=0.995, learning_rate=1e-3,
max_memory_size=2000):
self.enf = env
self.state_size = env.observation_space.shape[0]
self.action_size = env.action_space.n
self.memory = deque(maxlen=max_memory_size)
self.gamma = discount_factor
self.epsilon = epsilon_greedy
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_decay
self.lr = learning_rate
self._build_nn_model()
def _build_nn_model(self, n_layers=3):
self.model = tf.keras.Sequential()
# 은닉층
for n in range(n_layers - 1):
self.model.add(tf.keras.layers.Dense(
units=32, activation='relu'))
self.model.add(tf.keras.layers.Dense(
units=32, activation='relu'))
# 마지막 층
self.model.add(tf.keras.layers.Dense(
units=self.action_size))
# 모델 빌드 & 컴파일
self.model.build(input_shape=(None, self.state_size))
self.model.compile(
loss='mse',
optimizer=tf.keras.optimizers.Adam(lr=self.lr))
def remember(self, transition):
self.memory.append(transition)
def choose_action(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
q_values = self.model.predict(state)[0]
return np.argmax(q_values) # 행동 반환
def _learn(self, batch_samples):
batch_states, batch_targets = [], []
for transition in batch_samples:
s, a, r, next_s, done = transition
if done:
target = r
else:
target = (r +
self.gamma * np.amax(
self.model.predict(next_s)[0]
)
)
target_all = self.model.predict(s)[0]
target_all[a] = target
batch_states.append(s.flatten())
batch_targets.append(target_all)
self._adjust_epsilon()
return self.model.fit(x=np.array(batch_states),
y=np.array(batch_targets),
epochs=1,
verbose=0)
def _adjust_epsilon(self):
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def replay(self, batch_size):
samples = random.sample(self.memory, batch_size)
history = self._learn(samples)
return history.history['loss'][0]
def plot_learning_history(history):
fig = plt.figure(1, figsize=(14, 5))
ax = fig.add_subplot(1, 1, 1)
episodes = np.arange(len(history[0])) + 1
plt.plot(episodes, history[0], lw=4,
marker='o', markersize=10)
ax.tick_params(axis='both', which='major', labelsize=15)
plt.xlabel('Episodes', size=20)
plt.ylabel('# Total Rewards', size=20)
plt.show()
# 일반 설정
EPISODES = 200
batch_size = 32
init_replay_memory_size = 500
if __name__ == '__main__':
env = gym.make('CartPole-v1')
agent = DQNAgent(env)
state = env.reset()
state = np.reshape(state, [1, agent.state_size])
# 재생 메모리 채우기
for i in range(init_replay_memory_size):
action = agent.choose_action(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, agent.state_size])
agent.remember(Transition(state, action, reward,
next_state, done))
if done:
state = env.reset()
state = np.reshape(state, [1, agent.state_size])
else:
state = next_state
total_rewards, losses = [], []
for e in range(EPISODES):
state = env.reset()
if e % 10 == 0:
env.render()
state = np.reshape(state, [1, agent.state_size])
for i in range(500):
action = agent.choose_action(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state,
[1, agent.state_size])
agent.remember(Transition(state, action, reward,
next_state, done))
state = next_state
if e % 10 == 0:
env.render()
if done:
total_rewards.append(i)
print('에피소드: %d/%d, 총 보상: %d'
% (e, EPISODES, i))
break
loss = agent.replay(batch_size)
losses.append(loss)
plot_learning_history(total_rewards)
Image(url='https://bit.ly/2TDralR', width=800)