Skip to content

Instantly share code, notes, and snippets.

@kkweon
Created April 5, 2017 00:02
Show Gist options
  • Save kkweon/c4cc216e8ccdebc44460c4337cb1d320 to your computer and use it in GitHub Desktop.
Save kkweon/c4cc216e8ccdebc44460c4337cb1d320 to your computer and use it in GitHub Desktop.
A2C.py
"""
Actor-Critic Example
In this file, I will use abbreviations extensively
Here are examples:
s : state
a : action
r : reward
d : done
s2 : next state
"""
import gym
import numpy as np
import tensorflow as tf
class ActionType:
Box = 1
Discrete = 0
class PolicyNetwork:
def __init__(self, name, input_shape, output_dim, hidden_dims=[32, 32], discrete=True):
with tf.variable_scope(name):
self.s = tf.placeholder(tf.float32, shape=input_shape, name="state")
if discrete:
self.a = tf.placeholder(tf.int32, shape=[None], name="action")
a_onehot = tf.one_hot(self.a, depth=output_dim)
# else:
# self.a = tf.placeholder(tf.float32, shape=[None, output_dim], name="action")
self.advantage = tf.placeholder(tf.float32, shape=[None, 1], name="advantage")
net = self.s
for h_dim in hidden_dims:
net = tf.layers.dense(net, h_dim, activation=tf.nn.relu)
net = tf.layers.dense(net, output_dim)
if discrete:
net -= tf.reduce_max(net)
self.output = tf.nn.softmax(net)
loss = - tf.log(self.output) * a_onehot
else:
self.output = net
loss = - tf.log(self.output)
loss = tf.reduce_sum(loss, 1)
entropy = - self.output * tf.log(self.output + 1e-7)
self.loss = tf.reduce_sum(loss * self.advantage) + 0.003 * entropy
self.train_op = tf.train.AdamOptimizer().minimize(self.loss)
class ValueNetwork:
def __init__(self, name, input_shape, hidden_dims=[32, 32]):
with tf.variable_scope(name):
self.s = tf.placeholder(tf.float32, shape=input_shape, name="state")
self.r = tf.placeholder(tf.float32, shape=[None, 1], name="reward")
net = self.s
for h_dim in hidden_dims:
net = tf.layers.dense(net, h_dim, activation=tf.nn.relu)
net = tf.layers.dense(net, 1)
self.output = net
loss = - tf.square(self.output - self.r)
self.loss = tf.reduce_sum(loss)
self.train_op = tf.train.AdamOptimizer().minimize(self.loss)
class Agent:
def __init__(self, sess, env):
self.env = env
self.sess = sess
self.input_dim = self.env.observation_space.shape[0]
try:
self.output_dim = self.env.action_space.n
self.type = ActionType.Discrete
except AttributeError:
self.output_dim = self.env.action_space.shape[0]
self.type = ActionType.Box
self.policy_network = PolicyNetwork("policy", [None, self.input_dim], self.output_dim, discrete=self.type == ActionType.Discrete)
self.value_network = ValueNetwork("value", [None, self.input_dim])
def act(self, state):
state = np.reshape(state, [-1, self.input_dim])
feed = {
self.policy_network.s: state
}
policy = self.sess.run(self.policy_network.output, feed_dict=feed)[0]
if self.type == ActionType.Discrete:
try:
return np.random.choice(np.arange(self.output_dim), replace=False, p=policy)
except ValueError:
print(policy)
assert 0
else:
return np.clip(policy, self.env.action_space.low, self.env.action_space.high)
def train(self, state_list, action_list, reward_list, discount_rate):
"""Run value network
1. R = discount reward
2. Train Policy log p(a|s)(R-V)
3. Train Value (R-V)
"""
def discount_reward(reward_list):
N = len(reward_list)
d_reward = np.zeros_like(reward_list, dtype=np.float32)
running_add = 0
for i in np.arange(N - 1, -1, -1):
r = reward_list[i]
running_add = discount_rate * running_add + r
d_reward[i] = running_add
return d_reward
d_reward = discount_reward(reward_list)
value_feed = {
self.value_network.s: state_list,
self.value_network.r: d_reward
}
V = self.sess.run(self.value_network.output, feed_dict=value_feed)
advantage_list = d_reward - V
if self.type == ActionType.Discrete:
policy_feed = {
self.policy_network.s: state_list,
self.policy_network.a: action_list,
self.policy_network.advantage: advantage_list
}
else:
policy_feed = {
self.policy_network.s: state_list,
self.policy_network.advantage: advantage_list
}
self.sess.run(self.policy_network.train_op, feed_dict=policy_feed)
self.sess.run(self.value_network.train_op, feed_dict=value_feed)
def print_info(episode, reward):
print(f"[Episode {episode}] Reward {reward}")
def test_run(agent, env, render=False):
s = env.reset()
d = False
total_reward = 0
while not d:
if render:
env.render()
a = agent.act(s)
s2, r, d, i = env.step(a)
total_reward += r
s = s2
return r
def main():
env = gym.make("CartPole-v1")
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
input("Enter to continue...")
env = gym.wrappers.Monitor(env, "gym-results/", force=True)
sess = tf.InteractiveSession()
agent = Agent(sess, env)
init = tf.global_variables_initializer()
sess.run(init)
clear_condition = []
clear_threshold = env.spec.reward_threshold or float('inf')
for episode in range(5000):
state_list = []
action_list = []
reward_list = []
d = False
s = env.reset()
episode_reward = 0
while not d:
a = agent.act(s)
s2, r, d, i = env.step(a)
episode_reward += r
assert not np.isnan(r), r
if not d:
action_list.append(a)
state_list.append(s)
reward_list.append(r)
s = s2
state_list = np.vstack(state_list)
action_list = np.asarray(action_list)
reward_list = np.vstack(reward_list)
agent.train(state_list, action_list, reward_list, 0.99)
print_info(episode, episode_reward)
clear_condition.append(episode_reward)
if len(clear_condition) > 100:
clear_condition = clear_condition[1:]
avg_reward = np.mean(clear_condition)
if avg_reward >= clear_threshold:
break
for _ in range(10):
test_run(agent, env, True)
env.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment