Skip to content

Instantly share code, notes, and snippets.

@BartKeulen
Created February 15, 2017 15:57
Show Gist options
  • Save BartKeulen/cceda00365d55cb6a787ee5702581e85 to your computer and use it in GitHub Desktop.
Save BartKeulen/cceda00365d55cb6a787ee5702581e85 to your computer and use it in GitHub Desktop.
import numpy as np
class TileCoding(object):
def __init__(self, func_approx_settings):
self.num_tilings = func_approx_settings['num_tilings']
assert(self.num_tilings % 2 == 0)
self.state_dim = func_approx_settings['state_dim']
self.bound_low = func_approx_settings['bound_low']
self.bound_high = func_approx_settings['bound_high']
self.num_features_dim = func_approx_settings['num_features_dim']
self.num_features_tile = self.num_features_dim**self.state_dim
self.num_features = self.num_tilings*self.num_features_tile
size_tile = (self.bound_high - self.bound_low) / float(self.num_features_dim-2)
offset = np.array(size_tile/float(self.num_tilings)*np.arange(1, 2*self.state_dim, 2))
self.bins = []
for i in range(self.num_tilings):
self.bins.append([])
for j in range(self.state_dim):
self.bins[i].append(np.linspace(self.bound_low[j], self.bound_high[j], self.num_features_dim-1))
self.bins[i][j] += offset[j]*i
def get_features(self, state):
features = np.zeros(self.num_features)
for i in range(self.num_tilings):
feature = 0
for j in range(self.state_dim):
feature += np.digitize(state[j], self.bins[i][j])*self.num_features_dim**j
features[feature + i*self.num_features_tile] = 1
return features
def get_num_features(self):
return self.num_features
from algorithms.tdcontrol.tdcontrol import SARSA, QLearning
import gym
utility_settings = {
'algorithm': 'Q-Learning',
'gym': True,
'env_name': 'CartPole-v0',
'render_env': False,
'save_res': True,
'print_res': True,
'gym_monitor': True
}
env = utility_settings['env'] = gym.make(utility_settings['env_name'])
bound_low = env.observation_space.low
bound_low[1] = bound_low[3] = -10
bound_high = env.observation_space.high
bound_high[1] = bound_high[3] = 10
training_settings = {
'max_episodes': 1000,
'max_steps_episode': 250,
'alpha': 0.1,
'epsilon': 0.,
'gamma': 1.,
'state_dim': env.observation_space.shape[0],
'action_dim': env.action_space.n
}
func_tile_coding = {
'name': 'TileCoding',
'num_tilings': 8,
'num_features_dim': 10,
'state_dim': env.observation_space.shape[0],
'bound_low': bound_low,
'bound_high': bound_high
}
qlearning = QLearning(training_settings, utility_settings, func_tile_coding)
qlearning.train()
import gym
from gym import wrappers
import time
class RLAlgorithm(object):
def __init__(self, training_settings, utility_settings):
self.env_name = utility_settings['env_name']
self.algorithm = utility_settings['algorithm']
self.max_episodes = training_settings['max_episodes']
self.max_steps_episode = training_settings['max_steps_episode']
self.state_dim = training_settings['state_dim']
self.action_dim = training_settings['action_dim']
self.env = utility_settings['env']
self.render_env = utility_settings['render_env'] if 'render_env' in utility_settings else False
self.save_res = utility_settings['save_res'] if 'save_res' in utility_settings else False
self.print_res = utility_settings['print_res'] if 'print_res' in utility_settings else False
self.gym_monitor = utility_settings['gym_monitor'] if 'gym_monitor' in utility_settings else False
if self.gym_monitor:
self.env = wrappers.Monitor(self.env, '/code/algorithms/results/{}/{}/{}'.
format(self.env_name, self.algorithm, time.strftime('%m%d%Y%H%M%S')))
self.total_reward = 0.
self.num_steps = 0
self.results = {}
def train(self):
self.init_results() if self.save_res else None
self.print_init() if self.print_res else None
for i_episode in range(self.max_episodes):
observation = self.env.reset()
self.reset()
for t in range(self.max_steps_episode):
if self.render_env:
self.env.render()
action = self.get_action(observation)
next_observation, reward, done, info = self.env.step(action)
self.update(observation, action, reward, next_observation, done)
self.update_res(reward)
observation = next_observation
if done or t == self.max_steps_episode - 1:
self.update_results() if self.save_res else None
self.print_episode(i_episode) if self.print_res else None
break
return self.results
def get_action(self, observation):
pass
def update(self, observation, action, reward, next_observation, done):
pass
def max_Q(self, observation):
pass
def update_res(self, reward):
self.total_reward += reward
self.num_steps += 1
def reset(self):
self.total_reward = 0.
self.num_steps = 0
def init_results(self):
pass
def update_results(self):
pass
def get_reward(self):
return self.total_reward
def print_init(self):
pass
def print_episode(self, i_episode):
pass
import numpy as np
from algorithms.rlalgorithm import RLAlgorithm
from functionapproximators.features import *
class _TDControl(RLAlgorithm):
def __init__(self, training_settings, utility_settings, func_approx_settings):
RLAlgorithm.__init__(self, training_settings, utility_settings)
self.alpha = training_settings['alpha']
self.epsilon = training_settings['epsilon']
self.gamma = training_settings['gamma']
self.func_approx = eval(func_approx_settings['name'])(func_approx_settings)
self.num_features = self.func_approx.get_num_features()
self.num_weights = self.num_features*self.action_dim
self.theta = np.zeros(self.num_weights)
def get_action(self, observation):
if np.random.rand() < self.epsilon:
return np.random.randint(0, self.action_dim)
values = []
for i in range(self.action_dim):
values.append(self.Q(observation, i))
return np.random.choice(np.argwhere(values == np.max(values)).T[0])
def update(self, observation, action, reward, next_observation, done):
cur_q = self.Q(observation, action)
grad_q = self.grad_Q(observation, action)
next_q = 0
if not done:
next_q = self.get_next_q(next_observation)
self.theta += self.alpha * (reward + self.gamma*next_q - cur_q) * grad_q
def get_next_q(self, next_observation):
pass
def Q(self, observation, action):
features = self.get_features(observation, action)
return np.dot(self.theta, features)
def grad_Q(self, observation, action):
return self.get_features(observation, action)
def max_Q(self, observation):
values = []
for i in range(self.action_dim):
values.append(self.Q(observation, i))
return np.max(values)
def get_features(self, observation, action):
active_features = self.func_approx.get_features(observation)
features = np.zeros(self.num_weights)
features[action*self.num_features:(action+1)*self.num_features] = active_features
return features
def init_results(self):
self.results['total_reward'] = []
def update_results(self):
self.results['total_reward'].append(self.get_reward())
def get_ave(self):
if len(self.results['total_reward']) > 100:
return np.mean(self.results['total_reward'][-100:])
else:
return 0.
def print_init(self):
print '{:<10} {:<16} {:<16}' \
.format('Episode', 'Total reward', '100 ave')
print '-' * (10 + 17*2)
def print_episode(self, i_episode):
print '{:<10} {:<16.2f} {:<16.2f}' \
.format(i_episode, self.get_reward(), self.get_ave())
class SARSA(_TDControl):
def get_next_q(self, next_observation):
next_action = self.get_action(next_observation)
return self.Q(next_observation, next_action)
class QLearning(_TDControl):
def get_next_q(self, next_observation):
values = []
for i in range(self.action_dim):
values.append(self.Q(next_observation, i))
return self.Q(next_observation, np.random.choice(np.argwhere(values == np.max(values)).T[0]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment