Last active
June 14, 2016 05:38
-
-
Save JKCooper2/9ed5c027f87e41e6b75e0617c4b1b99f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class Discrete: | |
def __init__(self, values): | |
self.values = values | |
self.max = np.prod(self.values) | |
def __validate(self, observation): | |
for i in range(len(self.values)): | |
assert observation[i] < self.values[i] | |
def to_array(self, observation): | |
if len(self.values) == 1: | |
observation = [observation] | |
self.__validate(observation) | |
array_val = 0 | |
for i, obs in enumerate(observation): | |
array_val += obs * max(np.prod(self.values[i+1:]), 1) | |
return int(array_val) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import gym.scoreboard.scoring | |
from tabular_q_agent import TabularQAgent | |
ENVS = ["FrozenLake-v0", | |
"FrozenLake8x8-v0", | |
"Taxi-v1", | |
"Roulette-v0", | |
"NChain-v0", | |
"Blackjack-v0"] | |
def main(): | |
for env_name in ENVS: | |
env = gym.make(env_name) | |
agent = TabularQAgent(env.action_space, env.observation_space) | |
out_dir = '/tmp/' + agent.name + '-results' | |
env.monitor.start(out_dir, force=True) | |
n_episodes = 5000 | |
for i_episode in range(n_episodes): | |
observation = env.reset() | |
reward = 0 | |
done = False | |
action = agent.act(observation, reward, done) | |
while not done: | |
observation, reward, done, info = env.step(action) | |
action = agent.act(observation, reward, done) | |
print gym.scoreboard.scoring.score_from_local(out_dir) | |
env.monitor.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from collections import defaultdict | |
from discrete import Discrete | |
from gym.spaces import discrete, tuple_space | |
class TabularQAgent(object): | |
def __init__(self, action_space, observation_space, init_mean=0.0, init_std=0.2, alpha=0.5, epsilon=1, discount=0.95): | |
self.name = "TabularQAgent" | |
self.observation_space = observation_space | |
self.action_space = action_space | |
self.action_n = self.action_space.n | |
self.init_mean = init_mean | |
self.init_std = init_std | |
self.alpha = alpha | |
self.epsilon = epsilon | |
self.discount = discount | |
self.epsilon_decay = 0.997 # 0.997 = 5% per 1000 eps | |
self.epsilon_min = 0.02 | |
self.alpha_decay = 0.996 # 0.996 = 2% per 1000 eps | |
self.alpha_min = 0.02 | |
self.step_cost = -0.01 # So agent doesn't like states it's already been in that haven't lead to a reward | |
self.prev_obs = None | |
self.prev_action = None | |
self.fa = self.__set_fa() | |
self.ep_reward = 0 | |
self.ep_count = 0 | |
self.q = defaultdict(lambda: self.init_std * np.random.randn(self.action_n) + self.init_mean) | |
def __set_fa(self): | |
if isinstance(self.observation_space, tuple_space.Tuple): | |
return Discrete([space.n for space in self.observation_space.spaces]) | |
elif isinstance(self.observation_space, discrete.Discrete): | |
return Discrete([self.observation_space.n]) | |
# Epsilon Greedy | |
def __choose_action(self, observation): | |
return np.argmax(self.q[observation]) if np.random.random() > self.epsilon else self.action_space.sample() | |
def __learn(self, observation, reward, done): | |
future = np.max(self.q[observation]) if not done else 0.0 | |
# self.q[self.prev_obs][self.prev_action] -= self.alpha * (self.q[self.prev_obs][self.prev_action] - reward - self.discount * future) | |
self.q[self.prev_obs][self.prev_action] += self.alpha * (reward + self.discount * future - self.q[self.prev_obs][self.prev_action]) | |
def act(self, observation, reward, done): | |
observation = self.fa.to_array(observation) | |
reward += self.step_cost | |
self.__learn(observation, reward, done) | |
action = self.__choose_action(observation) | |
self.ep_reward += reward | |
self.prev_obs = observation | |
self.prev_action = action | |
if done: | |
self.ep_count += 1 | |
self.ep_reward = 0 | |
if self.epsilon > self.epsilon_min: | |
self.epsilon *= self.epsilon_decay | |
if self.alpha > self.alpha_min: | |
self.alpha *= self.alpha_decay | |
return action |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment