Created
April 29, 2017 20:16
-
-
Save breeko/a73e84812f6bfc8a388838bb1236d029 to your computer and use it in GitHub Desktop.
TD-Lambda algorithm used to solve MountainCar-v0 openai environment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import gym | |
from itertools import product | |
def init_centers(num_splits=4, num_obs=2): | |
""" Returns [num_splits**num_obs, num_obs] matrix of equidistant centers from (0,0) to (1,1) """ | |
return np.array(list(product(np.linspace(0,1,num_splits),repeat=num_obs))) | |
def init_theta(num_splits=4,num_actions=2): | |
""" Returns random [num_splits*num_splits,action_space] matrix of value action pairs """ | |
return np.random.random([num_splits*num_splits, num_actions]) - .5 | |
def normalize(env, state): | |
""" Normalizes observed state to range between 0 and 1 """ | |
return (state - env.observation_space.low) / (env.observation_space.high - env.observation_space.low) | |
def get_activations(state): | |
return np.exp([-np.linalg.norm(state - center)**2 / .05 for center in centers]) | |
def get_action_values(activations, theta): | |
""" Returns the value of each action at some state""" | |
return np.dot(theta.T, activations) | |
def get_action_value(activations, action, theta): | |
""" Returns the value of an action at some state""" | |
return np.dot(theta[:, action], activations) | |
def get_epsilon_greedy(env, vals, epsilon=0.1): | |
if np.random.random() < epsilon: | |
return env.action_space.sample() | |
return vals.argmax() | |
import matplotlib.pyplot as plt | |
def display_policy(theta): | |
policy = [] | |
for x in np.linspace(0,1): | |
for y in np.linspace(0,1): | |
state = [x,y] | |
action = get_epsilon_greedy(env, get_action_values(get_activations(state),theta),epsilon=0) | |
policy.append((x,y,action)) | |
policy=np.array(policy) | |
zero = policy[policy[:,2]==0] | |
one = policy[policy[:,2]==1] | |
two = policy[policy[:,2]==2] | |
plt.figure() | |
plt.scatter(zero[:,0],zero[:,1], marker=".") | |
plt.scatter(one[:,0],one[:,1], marker="1") | |
plt.scatter(two[:,0],two[:,1], marker="2") | |
plt.show() | |
env = gym.make('MountainCar-v0') | |
env = gym.wrappers.Monitor(env,directory="videos",force=True) | |
num_splits = 4 | |
num_actions = env.action_space.n | |
num_obs = env.observation_space.shape[0] | |
alpha = 0.01 # learning rate | |
alpha_decay = 1 # amount learning rate decays | |
gamma_decay = 0.5 # amount each subsequent action activation is discounted | |
lambda_decay = 0.99 # discount for future rewards | |
np.set_printoptions(precision=2) | |
num_episodes = 5000 | |
theta = init_theta(num_splits=num_splits, num_actions=num_actions) | |
centers = init_centers(num_splits=num_splits,num_obs=num_obs) | |
cum_rewards = [] | |
prob_random = 0.1 | |
prob_random_decay = 0.999 | |
print_every = 100 | |
render_every = float("inf") # never render | |
for num_episode in range(num_episodes): | |
eligibility = np.zeros_like(theta) | |
state = normalize(env, env.reset()) | |
activations = get_activations(state) | |
vals = get_action_values(activations, theta) | |
action = get_epsilon_greedy(env, vals) | |
rewards = 0 | |
if (num_episode + 1) % print_every == 0: | |
print("{} Last 100 rewards: {}".format(num_episode+1, np.mean(cum_rewards[-100:]))) | |
display_policy(theta) | |
while True: | |
if (num_episode + 1) % render_every == 0: | |
env.render() | |
new_state, reward, done, info = env.step(action) | |
new_state = normalize(env, new_state) | |
new_activations = get_activations(new_state) | |
new_vals = get_action_values(activations,theta) | |
new_action = get_epsilon_greedy(env, new_vals, epsilon=prob_random) | |
q = get_action_value(activations,action,theta) | |
new_q = get_action_value(new_activations,new_action,theta) | |
if done: | |
target = reward - q | |
else: | |
target = reward + lambda_decay * (new_q - q) | |
eligibility[:,action] = activations | |
theta += alpha * target * eligibility | |
eligibility *= lambda_decay * gamma_decay | |
activations = new_activations.copy() | |
state = new_state.copy() | |
vals = new_vals.copy() | |
action = new_action | |
rewards += reward | |
if done: | |
cum_rewards.append(rewards) | |
break | |
prob_random *= prob_random_decay | |
alpha *= alpha_decay | |
if np.mean(cum_rewards[-100:]) > -110: | |
print("Complete training after {} episodes".format(num_episode)) | |
break | |
env.close() | |
env = env.env.env | |
gym.upload("./videos/",api_key=api_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment