Created
August 17, 2018 11:33
-
-
Save icoxfog417/53f288c4b6b8713efa07719abed49a4f to your computer and use it in GitHub Desktop.
actor_critic_pendulum_2.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import itertools | |
import matplotlib | |
import numpy as np | |
import sys | |
import tensorflow as tf | |
import collections | |
import sklearn.pipeline | |
import sklearn.preprocessing | |
from sklearn.kernel_approximation import RBFSampler | |
env = gym.envs.make("Pendulum-v0") | |
env.observation_space.sample() | |
EpisodeStats = collections.namedtuple("Stats", ["episode_lengths", "episode_rewards"]) | |
# Feature Preprocessing: Normalize to zero mean and unit variance | |
# We use a few samples from the observation space to do this | |
observation_examples = np.array([env.observation_space.sample() for x in range(10000)]) | |
scaler = sklearn.preprocessing.StandardScaler() | |
scaler.fit(observation_examples) | |
# Used to converte a state to a featurizes represenation. | |
# We use RBF kernels with different variances to cover different parts of the space | |
featurizer = sklearn.pipeline.FeatureUnion([ | |
("rbf1", RBFSampler(gamma=5.0, n_components=100)), | |
("rbf2", RBFSampler(gamma=2.0, n_components=100)), | |
("rbf3", RBFSampler(gamma=1.0, n_components=100)), | |
("rbf4", RBFSampler(gamma=0.5, n_components=100)) | |
]) | |
featurizer.fit(scaler.transform(observation_examples)) | |
def featurize_state(state): | |
""" | |
Returns the featurized representation for a state. | |
""" | |
scaled = scaler.transform([state]) | |
featurized = featurizer.transform(scaled) | |
return featurized[0] | |
class PolicyEstimator(): | |
""" | |
Policy Function approximator. | |
""" | |
def __init__(self, learning_rate=0.01, scope="policy_estimator"): | |
with tf.variable_scope(scope): | |
self.state = tf.placeholder(tf.float32, [400], "state") | |
self.target = tf.placeholder(dtype=tf.float32, name="target") | |
# This is just linear classifier | |
self.mu = tf.layers.dense( | |
inputs=tf.expand_dims(self.state, 0), | |
units=1, | |
activation=None, | |
kernel_initializer=tf.zeros_initializer) | |
self.mu = tf.squeeze(self.mu) | |
self.sigma = tf.layers.dense( | |
inputs=tf.expand_dims(self.state, 0), | |
units=1, | |
activation=None, | |
kernel_initializer=tf.zeros_initializer) | |
self.sigma = tf.squeeze(self.sigma) | |
self.sigma = tf.nn.softplus(self.sigma) + 1e-5 | |
self.normal_dist = tf.distributions.Normal(self.mu, self.sigma) | |
self.action = self.normal_dist.sample(1) | |
self.action = tf.clip_by_value(self.action, env.action_space.low[0], env.action_space.high[0]) | |
# Loss and train op | |
self.loss = -self.normal_dist.log_prob(self.action) * self.target | |
# Add cross entropy cost to encourage exploration | |
self.loss -= 1e-1 * self.normal_dist.entropy() | |
self.optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) | |
self.train_op = self.optimizer.minimize( | |
self.loss, global_step=tf.train.get_global_step()) | |
def predict(self, state, sess=None): | |
sess = sess or tf.get_default_session() | |
state = featurize_state(state) | |
return sess.run(self.action, {self.state: state}) | |
def update(self, state, target, action, sess=None): | |
sess = sess or tf.get_default_session() | |
state = featurize_state(state) | |
feed_dict = {self.state: state, self.target: target, self.action: action} | |
_, loss = sess.run([self.train_op, self.loss], feed_dict) | |
return loss | |
class ValueEstimator(): | |
""" | |
Value Function approximator. | |
""" | |
def __init__(self, learning_rate=0.1, scope="value_estimator"): | |
with tf.variable_scope(scope): | |
self.state = tf.placeholder(tf.float32, [400], "state") | |
self.target = tf.placeholder(dtype=tf.float32, name="target") | |
# This is just linear classifier | |
self.output_layer = tf.layers.dense( | |
inputs=tf.expand_dims(self.state, 0), | |
units=1, | |
activation=None, | |
kernel_initializer=tf.zeros_initializer) | |
self.value_estimate = tf.squeeze(self.output_layer) | |
self.loss = tf.squared_difference(self.value_estimate, self.target) | |
self.optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) | |
self.train_op = self.optimizer.minimize( | |
self.loss, global_step=tf.train.get_global_step()) | |
def predict(self, state, sess=None): | |
sess = sess or tf.get_default_session() | |
state = featurize_state(state) | |
return sess.run(self.value_estimate, {self.state: state}) | |
def update(self, state, target, sess=None): | |
sess = sess or tf.get_default_session() | |
state = featurize_state(state) | |
feed_dict = { self.state: state, self.target: target } | |
_, loss = sess.run([self.train_op, self.loss], feed_dict) | |
return loss | |
def actor_critic(env, estimator_policy, estimator_value, num_episodes, discount_factor=0.95): | |
""" | |
Actor Critic Algorithm. Optimizes the policy | |
function approximator using policy gradient. | |
Args: | |
env: OpenAI environment. | |
estimator_policy: Policy Function to be optimized | |
estimator_value: Value function approximator, used as a critic | |
num_episodes: Number of episodes to run for | |
discount_factor: Time-discount factor | |
Returns: | |
An EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards. | |
""" | |
# Keeps track of useful statistics | |
stats = EpisodeStats( | |
episode_lengths=np.zeros(num_episodes), | |
episode_rewards=np.zeros(num_episodes)) | |
Transition = collections.namedtuple("Transition", ["state", "action", "reward", "next_state", "done"]) | |
for i_episode in range(num_episodes): | |
# Reset the environment and pick the fisrst action | |
state = env.reset() | |
episode = [] | |
# One step in the environment | |
for t in itertools.count(): | |
env.render() | |
# Take a step | |
action = estimator_policy.predict(state) | |
next_state, reward, done, _ = env.step(action) | |
# Keep track of the transition | |
episode.append(Transition( | |
state=state, action=action, reward=reward, next_state=next_state, done=done)) | |
# Update statistics | |
stats.episode_rewards[i_episode] += reward | |
stats.episode_lengths[i_episode] = t | |
# Calculate TD Target | |
value_next = estimator_value.predict(next_state) | |
td_target = reward + discount_factor * value_next | |
td_error = td_target - estimator_value.predict(state) | |
# Update the value estimator | |
estimator_value.update(state, td_target) | |
# Update the policy estimator | |
# using the td error as our advantage estimate | |
estimator_policy.update(state, td_error, action) | |
# Print out which step we're on, useful for debugging. | |
print("\rStep {} @ Episode {}/{} ({})".format( | |
t, i_episode + 1, num_episodes, stats.episode_rewards[i_episode - 1]), end="") | |
if done: | |
break | |
state = next_state | |
return stats | |
tf.reset_default_graph() | |
global_step = tf.Variable(0, name="global_step", trainable=False) | |
policy_estimator = PolicyEstimator(learning_rate=0.001) | |
value_estimator = ValueEstimator(learning_rate=0.1) | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
# Note, due to randomness in the policy the number of episodes you need varies | |
# TODO: Sometimes the algorithm gets stuck, I'm not sure what exactly is happening there. | |
stats = actor_critic(env, policy_estimator, value_estimator, 50, discount_factor=0.95) | |
print(stats) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment