Skip to content

Instantly share code, notes, and snippets.

@zsal
Created June 27, 2016 02:57
Show Gist options
  • Save zsal/dca2bec0cb4b8d3ed74b63e92281f847 to your computer and use it in GitHub Desktop.
Save zsal/dca2bec0cb4b8d3ed74b63e92281f847 to your computer and use it in GitHub Desktop.
John Schulman MLSS Lab 1: CartPole-v0
#Most code from John Schulman's MLSS talk on Deep Reinforcement Learning
#http://rl-gym-doc.s3-website-us-west-2.amazonaws.com/mlss/lab1.html#szitalorincz06
import numpy as np
import gym
from gym.spaces import Discrete, Box
# ================================================================
# Policies
# ================================================================
class DeterministicDiscreteActionLinearPolicy(object):
def __init__(self, theta, ob_space, ac_space):
"""
dim_ob: dimension of observations
n_actions: number of actions
theta: flat vector of parameters
"""
dim_ob = ob_space.shape[0]
n_actions = ac_space.n
assert len(theta) == (dim_ob + 1) * n_actions
self.W = theta[0 : dim_ob * n_actions].reshape(dim_ob, n_actions)
self.b = theta[dim_ob * n_actions : None].reshape(1, n_actions)
def act(self, ob):
"""
"""
y = ob.dot(self.W) + self.b
a = y.argmax()
return a
class DeterministicContinuousActionLinearPolicy(object):
def __init__(self, theta, ob_space, ac_space):
"""
dim_ob: dimension of observations
dim_ac: dimension of action vector
theta: flat vector of parameters
"""
self.ac_space = ac_space
dim_ob = ob_space.shape[0]
dim_ac = ac_space.shape[0]
assert len(theta) == (dim_ob + 1) * dim_ac
self.W = theta[0 : dim_ob * dim_ac].reshape(dim_ob, dim_ac)
self.b = theta[dim_ob * dim_ac : None]
def act(self, ob):
a = np.clip(ob.dot(self.W) + self.b, self.ac_space.low, self.ac_space.high)
return a
def do_episode(policy, env, num_steps, render=False):
total_rew = 0
ob = env.reset()
for t in range(num_steps):
a = policy.act(ob)
(ob, reward, done, _info) = env.step(a)
total_rew += reward
if render and t%3==0: env.render()
if done: break
return total_rew
env = None
def noisy_evaluation(theta):
#import pdb; pdb.set_trace()
policy = make_policy(theta)
rew = do_episode(policy, env, num_steps)
return rew
def make_policy(theta):
if isinstance(env.action_space, Discrete):
return DeterministicDiscreteActionLinearPolicy(theta,
env.observation_space, env.action_space)
elif isinstance(env.action_space, Box):
return DeterministicContinuousActionLinearPolicy(theta,
env.observation_space, env.action_space)
else:
raise NotImplementedError
# Task settings:
env = gym.make('CartPole-v0') # Change as needed
num_steps = 10000 # maximum length of episode
# Alg settings:
n_iter = 25 # number of iterations of CEM
batch_size = 25 # number of samples per batch
elite_frac = 0.2 # fraction of samples used as elite set
if isinstance(env.action_space, Discrete):
dim_theta = (env.observation_space.shape[0]+1) * env.action_space.n
elif isinstance(env.action_space, Box):
dim_theta = (env.observation_space.shape[0]+1) * env.action_space.shape[0]
else:
raise NotImplementedError
# Initialize mean and standard deviation
theta_mean = np.zeros(dim_theta)
theta_std = np.ones(dim_theta)
theta_mean = np.array([ 0.70950523, 0.75781936, -0.86709577, 0.58244231, 0.16096483,
1.12353121, -0.60289769, 1.158481 , -0.5017926 , -0.49675536])
theta_std = np.array([ 0.00109376, 0.00192984, 0.00037013, 0.00991735, 0.0021484 ,
0.00227502, 0.01552589, 0.01709521, 0.00127029, 0.00048866])
# Now, for the algorithm
for iteration in xrange(n_iter):
# Sample parameter vectors
thetas = [np.random.normal(loc=theta_mean, scale=theta_std, size=dim_theta)
for x in xrange(batch_size)]
rewards = [noisy_evaluation(theta) for theta in thetas]
# Get elite parameters
n_elite = int(batch_size * elite_frac)
elite_inds = np.argsort(rewards)[batch_size - n_elite:batch_size]
elite_thetas = [thetas[i] for i in elite_inds]
# Update theta_mean, theta_std
theta_mean = np.mean(elite_thetas, axis=0)
theta_std = np.std(elite_thetas, axis=0)
print "iteration %i. mean f: %8.3g. max f: %8.3g"%(iteration,
np.mean(rewards),
np.max(rewards))
do_episode(make_policy(theta_mean), env, num_steps, render=True)
env.render(close=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment