Created
April 5, 2017 00:02
-
-
Save kkweon/c4cc216e8ccdebc44460c4337cb1d320 to your computer and use it in GitHub Desktop.
A2C.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Actor-Critic Example | |
In this file, I will use abbreviations extensively | |
Here are examples: | |
s : state | |
a : action | |
r : reward | |
d : done | |
s2 : next state | |
""" | |
import gym | |
import numpy as np | |
import tensorflow as tf | |
class ActionType: | |
Box = 1 | |
Discrete = 0 | |
class PolicyNetwork: | |
def __init__(self, name, input_shape, output_dim, hidden_dims=[32, 32], discrete=True): | |
with tf.variable_scope(name): | |
self.s = tf.placeholder(tf.float32, shape=input_shape, name="state") | |
if discrete: | |
self.a = tf.placeholder(tf.int32, shape=[None], name="action") | |
a_onehot = tf.one_hot(self.a, depth=output_dim) | |
# else: | |
# self.a = tf.placeholder(tf.float32, shape=[None, output_dim], name="action") | |
self.advantage = tf.placeholder(tf.float32, shape=[None, 1], name="advantage") | |
net = self.s | |
for h_dim in hidden_dims: | |
net = tf.layers.dense(net, h_dim, activation=tf.nn.relu) | |
net = tf.layers.dense(net, output_dim) | |
if discrete: | |
net -= tf.reduce_max(net) | |
self.output = tf.nn.softmax(net) | |
loss = - tf.log(self.output) * a_onehot | |
else: | |
self.output = net | |
loss = - tf.log(self.output) | |
loss = tf.reduce_sum(loss, 1) | |
entropy = - self.output * tf.log(self.output + 1e-7) | |
self.loss = tf.reduce_sum(loss * self.advantage) + 0.003 * entropy | |
self.train_op = tf.train.AdamOptimizer().minimize(self.loss) | |
class ValueNetwork: | |
def __init__(self, name, input_shape, hidden_dims=[32, 32]): | |
with tf.variable_scope(name): | |
self.s = tf.placeholder(tf.float32, shape=input_shape, name="state") | |
self.r = tf.placeholder(tf.float32, shape=[None, 1], name="reward") | |
net = self.s | |
for h_dim in hidden_dims: | |
net = tf.layers.dense(net, h_dim, activation=tf.nn.relu) | |
net = tf.layers.dense(net, 1) | |
self.output = net | |
loss = - tf.square(self.output - self.r) | |
self.loss = tf.reduce_sum(loss) | |
self.train_op = tf.train.AdamOptimizer().minimize(self.loss) | |
class Agent: | |
def __init__(self, sess, env): | |
self.env = env | |
self.sess = sess | |
self.input_dim = self.env.observation_space.shape[0] | |
try: | |
self.output_dim = self.env.action_space.n | |
self.type = ActionType.Discrete | |
except AttributeError: | |
self.output_dim = self.env.action_space.shape[0] | |
self.type = ActionType.Box | |
self.policy_network = PolicyNetwork("policy", [None, self.input_dim], self.output_dim, discrete=self.type == ActionType.Discrete) | |
self.value_network = ValueNetwork("value", [None, self.input_dim]) | |
def act(self, state): | |
state = np.reshape(state, [-1, self.input_dim]) | |
feed = { | |
self.policy_network.s: state | |
} | |
policy = self.sess.run(self.policy_network.output, feed_dict=feed)[0] | |
if self.type == ActionType.Discrete: | |
try: | |
return np.random.choice(np.arange(self.output_dim), replace=False, p=policy) | |
except ValueError: | |
print(policy) | |
assert 0 | |
else: | |
return np.clip(policy, self.env.action_space.low, self.env.action_space.high) | |
def train(self, state_list, action_list, reward_list, discount_rate): | |
"""Run value network | |
1. R = discount reward | |
2. Train Policy log p(a|s)(R-V) | |
3. Train Value (R-V) | |
""" | |
def discount_reward(reward_list): | |
N = len(reward_list) | |
d_reward = np.zeros_like(reward_list, dtype=np.float32) | |
running_add = 0 | |
for i in np.arange(N - 1, -1, -1): | |
r = reward_list[i] | |
running_add = discount_rate * running_add + r | |
d_reward[i] = running_add | |
return d_reward | |
d_reward = discount_reward(reward_list) | |
value_feed = { | |
self.value_network.s: state_list, | |
self.value_network.r: d_reward | |
} | |
V = self.sess.run(self.value_network.output, feed_dict=value_feed) | |
advantage_list = d_reward - V | |
if self.type == ActionType.Discrete: | |
policy_feed = { | |
self.policy_network.s: state_list, | |
self.policy_network.a: action_list, | |
self.policy_network.advantage: advantage_list | |
} | |
else: | |
policy_feed = { | |
self.policy_network.s: state_list, | |
self.policy_network.advantage: advantage_list | |
} | |
self.sess.run(self.policy_network.train_op, feed_dict=policy_feed) | |
self.sess.run(self.value_network.train_op, feed_dict=value_feed) | |
def print_info(episode, reward): | |
print(f"[Episode {episode}] Reward {reward}") | |
def test_run(agent, env, render=False): | |
s = env.reset() | |
d = False | |
total_reward = 0 | |
while not d: | |
if render: | |
env.render() | |
a = agent.act(s) | |
s2, r, d, i = env.step(a) | |
total_reward += r | |
s = s2 | |
return r | |
def main(): | |
env = gym.make("CartPole-v1") | |
print(f"Observation space: {env.observation_space}") | |
print(f"Action space: {env.action_space}") | |
input("Enter to continue...") | |
env = gym.wrappers.Monitor(env, "gym-results/", force=True) | |
sess = tf.InteractiveSession() | |
agent = Agent(sess, env) | |
init = tf.global_variables_initializer() | |
sess.run(init) | |
clear_condition = [] | |
clear_threshold = env.spec.reward_threshold or float('inf') | |
for episode in range(5000): | |
state_list = [] | |
action_list = [] | |
reward_list = [] | |
d = False | |
s = env.reset() | |
episode_reward = 0 | |
while not d: | |
a = agent.act(s) | |
s2, r, d, i = env.step(a) | |
episode_reward += r | |
assert not np.isnan(r), r | |
if not d: | |
action_list.append(a) | |
state_list.append(s) | |
reward_list.append(r) | |
s = s2 | |
state_list = np.vstack(state_list) | |
action_list = np.asarray(action_list) | |
reward_list = np.vstack(reward_list) | |
agent.train(state_list, action_list, reward_list, 0.99) | |
print_info(episode, episode_reward) | |
clear_condition.append(episode_reward) | |
if len(clear_condition) > 100: | |
clear_condition = clear_condition[1:] | |
avg_reward = np.mean(clear_condition) | |
if avg_reward >= clear_threshold: | |
break | |
for _ in range(10): | |
test_run(agent, env, True) | |
env.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment