Created
February 15, 2017 15:57
-
-
Save BartKeulen/cceda00365d55cb6a787ee5702581e85 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class TileCoding(object): | |
def __init__(self, func_approx_settings): | |
self.num_tilings = func_approx_settings['num_tilings'] | |
assert(self.num_tilings % 2 == 0) | |
self.state_dim = func_approx_settings['state_dim'] | |
self.bound_low = func_approx_settings['bound_low'] | |
self.bound_high = func_approx_settings['bound_high'] | |
self.num_features_dim = func_approx_settings['num_features_dim'] | |
self.num_features_tile = self.num_features_dim**self.state_dim | |
self.num_features = self.num_tilings*self.num_features_tile | |
size_tile = (self.bound_high - self.bound_low) / float(self.num_features_dim-2) | |
offset = np.array(size_tile/float(self.num_tilings)*np.arange(1, 2*self.state_dim, 2)) | |
self.bins = [] | |
for i in range(self.num_tilings): | |
self.bins.append([]) | |
for j in range(self.state_dim): | |
self.bins[i].append(np.linspace(self.bound_low[j], self.bound_high[j], self.num_features_dim-1)) | |
self.bins[i][j] += offset[j]*i | |
def get_features(self, state): | |
features = np.zeros(self.num_features) | |
for i in range(self.num_tilings): | |
feature = 0 | |
for j in range(self.state_dim): | |
feature += np.digitize(state[j], self.bins[i][j])*self.num_features_dim**j | |
features[feature + i*self.num_features_tile] = 1 | |
return features | |
def get_num_features(self): | |
return self.num_features |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from algorithms.tdcontrol.tdcontrol import SARSA, QLearning | |
import gym | |
utility_settings = { | |
'algorithm': 'Q-Learning', | |
'gym': True, | |
'env_name': 'CartPole-v0', | |
'render_env': False, | |
'save_res': True, | |
'print_res': True, | |
'gym_monitor': True | |
} | |
env = utility_settings['env'] = gym.make(utility_settings['env_name']) | |
bound_low = env.observation_space.low | |
bound_low[1] = bound_low[3] = -10 | |
bound_high = env.observation_space.high | |
bound_high[1] = bound_high[3] = 10 | |
training_settings = { | |
'max_episodes': 1000, | |
'max_steps_episode': 250, | |
'alpha': 0.1, | |
'epsilon': 0., | |
'gamma': 1., | |
'state_dim': env.observation_space.shape[0], | |
'action_dim': env.action_space.n | |
} | |
func_tile_coding = { | |
'name': 'TileCoding', | |
'num_tilings': 8, | |
'num_features_dim': 10, | |
'state_dim': env.observation_space.shape[0], | |
'bound_low': bound_low, | |
'bound_high': bound_high | |
} | |
qlearning = QLearning(training_settings, utility_settings, func_tile_coding) | |
qlearning.train() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
from gym import wrappers | |
import time | |
class RLAlgorithm(object): | |
def __init__(self, training_settings, utility_settings): | |
self.env_name = utility_settings['env_name'] | |
self.algorithm = utility_settings['algorithm'] | |
self.max_episodes = training_settings['max_episodes'] | |
self.max_steps_episode = training_settings['max_steps_episode'] | |
self.state_dim = training_settings['state_dim'] | |
self.action_dim = training_settings['action_dim'] | |
self.env = utility_settings['env'] | |
self.render_env = utility_settings['render_env'] if 'render_env' in utility_settings else False | |
self.save_res = utility_settings['save_res'] if 'save_res' in utility_settings else False | |
self.print_res = utility_settings['print_res'] if 'print_res' in utility_settings else False | |
self.gym_monitor = utility_settings['gym_monitor'] if 'gym_monitor' in utility_settings else False | |
if self.gym_monitor: | |
self.env = wrappers.Monitor(self.env, '/code/algorithms/results/{}/{}/{}'. | |
format(self.env_name, self.algorithm, time.strftime('%m%d%Y%H%M%S'))) | |
self.total_reward = 0. | |
self.num_steps = 0 | |
self.results = {} | |
def train(self): | |
self.init_results() if self.save_res else None | |
self.print_init() if self.print_res else None | |
for i_episode in range(self.max_episodes): | |
observation = self.env.reset() | |
self.reset() | |
for t in range(self.max_steps_episode): | |
if self.render_env: | |
self.env.render() | |
action = self.get_action(observation) | |
next_observation, reward, done, info = self.env.step(action) | |
self.update(observation, action, reward, next_observation, done) | |
self.update_res(reward) | |
observation = next_observation | |
if done or t == self.max_steps_episode - 1: | |
self.update_results() if self.save_res else None | |
self.print_episode(i_episode) if self.print_res else None | |
break | |
return self.results | |
def get_action(self, observation): | |
pass | |
def update(self, observation, action, reward, next_observation, done): | |
pass | |
def max_Q(self, observation): | |
pass | |
def update_res(self, reward): | |
self.total_reward += reward | |
self.num_steps += 1 | |
def reset(self): | |
self.total_reward = 0. | |
self.num_steps = 0 | |
def init_results(self): | |
pass | |
def update_results(self): | |
pass | |
def get_reward(self): | |
return self.total_reward | |
def print_init(self): | |
pass | |
def print_episode(self, i_episode): | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from algorithms.rlalgorithm import RLAlgorithm | |
from functionapproximators.features import * | |
class _TDControl(RLAlgorithm): | |
def __init__(self, training_settings, utility_settings, func_approx_settings): | |
RLAlgorithm.__init__(self, training_settings, utility_settings) | |
self.alpha = training_settings['alpha'] | |
self.epsilon = training_settings['epsilon'] | |
self.gamma = training_settings['gamma'] | |
self.func_approx = eval(func_approx_settings['name'])(func_approx_settings) | |
self.num_features = self.func_approx.get_num_features() | |
self.num_weights = self.num_features*self.action_dim | |
self.theta = np.zeros(self.num_weights) | |
def get_action(self, observation): | |
if np.random.rand() < self.epsilon: | |
return np.random.randint(0, self.action_dim) | |
values = [] | |
for i in range(self.action_dim): | |
values.append(self.Q(observation, i)) | |
return np.random.choice(np.argwhere(values == np.max(values)).T[0]) | |
def update(self, observation, action, reward, next_observation, done): | |
cur_q = self.Q(observation, action) | |
grad_q = self.grad_Q(observation, action) | |
next_q = 0 | |
if not done: | |
next_q = self.get_next_q(next_observation) | |
self.theta += self.alpha * (reward + self.gamma*next_q - cur_q) * grad_q | |
def get_next_q(self, next_observation): | |
pass | |
def Q(self, observation, action): | |
features = self.get_features(observation, action) | |
return np.dot(self.theta, features) | |
def grad_Q(self, observation, action): | |
return self.get_features(observation, action) | |
def max_Q(self, observation): | |
values = [] | |
for i in range(self.action_dim): | |
values.append(self.Q(observation, i)) | |
return np.max(values) | |
def get_features(self, observation, action): | |
active_features = self.func_approx.get_features(observation) | |
features = np.zeros(self.num_weights) | |
features[action*self.num_features:(action+1)*self.num_features] = active_features | |
return features | |
def init_results(self): | |
self.results['total_reward'] = [] | |
def update_results(self): | |
self.results['total_reward'].append(self.get_reward()) | |
def get_ave(self): | |
if len(self.results['total_reward']) > 100: | |
return np.mean(self.results['total_reward'][-100:]) | |
else: | |
return 0. | |
def print_init(self): | |
print '{:<10} {:<16} {:<16}' \ | |
.format('Episode', 'Total reward', '100 ave') | |
print '-' * (10 + 17*2) | |
def print_episode(self, i_episode): | |
print '{:<10} {:<16.2f} {:<16.2f}' \ | |
.format(i_episode, self.get_reward(), self.get_ave()) | |
class SARSA(_TDControl): | |
def get_next_q(self, next_observation): | |
next_action = self.get_action(next_observation) | |
return self.Q(next_observation, next_action) | |
class QLearning(_TDControl): | |
def get_next_q(self, next_observation): | |
values = [] | |
for i in range(self.action_dim): | |
values.append(self.Q(next_observation, i)) | |
return self.Q(next_observation, np.random.choice(np.argwhere(values == np.max(values)).T[0])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment