import tensorflow as tf, numpy as np, gym # Here is a list of envs: https://gym.openai.com/envs/ env = gym.make('CartPole-v1') # Reset the environment and return the initial state: env.reset() # We can also get the shape of the state space directly: env.observation_space # If we were local, we could visualize the environment. # To get an idea, see https://gym.openai.com/envs/CartPole-v1/ # env.render() # Get available actions env.action_space # Sample a random action env.action_space.sample() # To perform an action in the environment we do: obs, reward, done, info = env.step(env.action_space.sample()) print('next state: {}'.format(obs)) print('reward: {}'.format(reward)) print('episode over: {}'.format(done)) print('extra info: {}'.format(info)) class RandomAgent(): def __init__(self, env): self.env = env def train(self): """ This agent doesn't need to be trained! """ pass def act(self, state): """ This executes the agent's policy """ return self.env.action_space.sample() def run_episode(self, max_steps=100, verbose=False): """ Runs a test episode """ state = self.env.reset() total_reward = 0 if verbose: print("Step 0; State: {}".format(state)) for step in range(max_steps): next_state, reward, done, _ = self.env.step(self.act(state)) if verbose: print("Step {}; State: {}; Reward: {}".format(step, state, reward)) state = next_state total_reward += reward if done: print("Done! Final state: {}; Total reward: {}".format(state, total_reward)) break elif step == max_steps -1: print("Did not finish in time! Final state: {}; Total reward: {}".format(state, total_reward)) agent = RandomAgent(env) agent.act([0,0,0,0]) agent.run_episode(max_steps=20, verbose=True) class RandomEnv(gym.Env): def __init__(self, num_states=5, num_actions=1, reward_range=(-1,1), seed=None): # We generate random num_states x num_states transition matrices, # one for each action # so self.transitions[action][state] returns a vector of next_state probs # i.e., self.transitions[a][s][s'] = T(s, a, s') self.seed(seed) self.transitions = [] for action in range(num_actions): t = np.random.random((num_states, num_states)) + 1e-2 t = t / np.sum(t, axis=1, keepdims=True) self.transitions.append(t) # We'll make rewards a function of states and actions # So self.rewards[state][action] is our reward fn self.rewards = np.random.random((num_states, num_actions)) self.action_space = gym.spaces.Discrete(num_actions) self.observation_space = gym.spaces.Discrete(num_states) self.reward_range = reward_range self._state = 0 def seed(self, seed): np.random.seed(seed=seed) def render(self): pass def close(self): pass def reset(self): self._state = 0 return 0 def step(self, action): reward = self.rewards[self._state, action] next_state_probs = self.transitions[action][self._state] next_state = np.random.choice(range(self.observation_space.n), p=next_state_probs) self._state = next_state return next_state, reward, False, None E = RandomEnv(seed=1) E.transitions[0][0] E.rewards[3][0] value_vector = np.linalg.inv((np.eye(E.observation_space.n) - 0.99*E.transitions[0])).dot(E.rewards[:,0]) value_vector E.rewards[:,0] E = RandomEnv(num_states = 5, num_actions = 2, seed=1) E.transitions def value_iteration(env, iters=1000, gamma=0.99): v = np.random.random((env.observation_space.n,)) for i in range(iters): for s in range(env.observation_space.n): rs = env.rewards[s] next_state_values = [] for a in range(env.action_space.n): next_state_values.append(env.transitions[a][s].dot(v)) next_state_targets = rs + gamma * np.array(next_state_values) v[s] = np.max(next_state_targets) return v v = value_iteration(E) print(v) def act(env, s, v, gamma=0.99): rs = env.rewards[s] next_state_values = [] for a in range(env.action_space.n): next_state_values.append(env.transitions[a][s].dot(v)) next_state_targets = rs + gamma * np.array(next_state_values) return np.argmax(next_state_values) for state in range(5): print(act(E, state, v)) class QLearningAgent(): """ Copying over our random agent, but adding the ability to train it. """ def __init__(self, env, lr=0.01, gamma=0.99): self.env = env self.gamma = gamma self.lr = lr # Our Q function will be a [num_states, num_actions] matrix # So Q(s, a) = self.Q[s, a] self.Q = np.zeros((env.observation_space.n, env.action_space.n)) def train(self, num_steps=100000): """ This agent doesn't need to be trained! """ state = self.env.reset() for step in range(num_steps): # take a step in the environment] action = self.act(state) next_state, reward, done, info = self.env.step(action) if done: discount = 0 else: discount = self.gamma target = reward + discount * np.max(self.Q[next_state]) td_error = target - self.Q[state, action] self.Q[state, action] += self.lr * td_error state = next_state def act(self, state): """ This executes the agent's policy -- let's stick to random for now""" return self.env.action_space.sample() def run_episode(self, max_steps=100, verbose=False): """ Runs a test episode """ state = self.env.reset() total_reward = 0 if verbose: print("Step 0; State: {}".format(state)) for step in range(max_steps): next_state, reward, done, _ = self.env.step(self.act(state)) if verbose: print("Step {}; State: {}; Reward: {}".format(step, state, reward)) state = next_state total_reward += reward if done: print("Done! Final state: {}; Total reward: {}".format(state, total_reward)) break elif step == max_steps -1: print("Did not finish in time! Final state: {}; Total reward: {}".format(state, total_reward)) agent = QLearningAgent(E) agent.train(100000) # Not quite there... need more training steps! agent.Q agent.train(500000) agent.Q # Compare to this to the result of the value iteration above: print(v) print(np.max(agent.Q, axis=1))