Skip to content

Instantly share code, notes, and snippets.

@icoxfog417
Created August 17, 2018 11:33
Show Gist options
  • Save icoxfog417/24962b0664707c5df863cb00ef4ba03e to your computer and use it in GitHub Desktop.
Save icoxfog417/24962b0664707c5df863cb00ef4ba03e to your computer and use it in GitHub Desktop.
actor_critic_pendulum_1.py
import gym
import numpy as np
from tensorflow.python import keras as K
import tensorflow as tf
import random
from collections import deque
class ActorCritic:
def __init__(self, env, sess):
self.env = env
self.sess = sess
self.learning_rate = 0.001
self.epsilon = 1.0
self.epsilon_decay = .995
self.gamma = .95
self.tau = .125
self.memory = deque(maxlen=2000)
self.actor_state_input, self.actor_model = self.create_actor_model()
_, self.target_actor_model = self.create_actor_model()
self.actor_critic_grad = tf.placeholder(tf.float32,
[None, self.env.action_space.shape[0]])
actor_model_weights = self.actor_model.trainable_weights
self.actor_grads = tf.gradients(self.actor_model.output,
actor_model_weights, -self.actor_critic_grad) # dC/dA (from actor)
grads = zip(self.actor_grads, actor_model_weights)
self.optimize = tf.train.AdamOptimizer(self.learning_rate).apply_gradients(grads)
self.critic_state_input, self.critic_action_input, \
self.critic_model = self.create_critic_model()
_, _, self.target_critic_model = self.create_critic_model()
self.critic_grads = tf.gradients(self.critic_model.output,
self.critic_action_input)
# Initialize for later gradient calculations
self.sess.run(tf.initialize_all_variables())
def create_actor_model(self):
state_input = K.layers.Input(shape=self.env.observation_space.shape)
h1 = K.layers.Dense(24, activation='relu')(state_input)
h2 = K.layers.Dense(48, activation='relu')(h1)
h3 = K.layers.Dense(24, activation='relu')(h2)
output = K.layers.Dense(self.env.action_space.shape[0], activation='relu')(h3)
model = K.models.Model(inputs=state_input, outputs=output)
optimizer = K.optimizers.Adam(lr=0.001)
model.compile(loss="mse", optimizer=optimizer)
return state_input, model
def create_critic_model(self):
state_input = K.layers.Input(shape=self.env.observation_space.shape)
state_h1 = K.layers.Dense(24, activation='relu')(state_input)
state_h2 = K.layers.Dense(48)(state_h1)
action_input = K.layers.Input(shape=self.env.action_space.shape)
action_h1 = K.layers.Dense(48)(action_input)
merged = K.layers.Add()([state_h2, action_h1])
merged_h1 = K.layers.Dense(24, activation='relu')(merged)
output = K.layers.Dense(1, activation='relu')(merged_h1)
model = K.models.Model(inputs=[state_input,action_input], outputs=output)
optimizer = K.optimizers.Adam(lr=0.001)
model.compile(loss="mse", optimizer=optimizer)
return state_input, action_input, model
def remember(self, cur_state, action, reward, new_state, done):
self.memory.append([cur_state, action, reward, new_state, done])
def _train_actor(self, samples):
for sample in samples:
cur_state, action, reward, new_state, _ = sample
predicted_action = self.actor_model.predict(cur_state)
grads = self.sess.run(self.critic_grads, feed_dict={
self.critic_state_input: cur_state,
self.critic_action_input: predicted_action
})[0]
self.sess.run(self.optimize, feed_dict={
self.actor_state_input: cur_state,
self.actor_critic_grad: grads
})
def _train_critic(self, samples):
for sample in samples:
cur_state, action, reward, new_state, done = sample
if not done:
target_action = self.target_actor_model.predict(new_state)
future_reward = self.target_critic_model.predict(
[new_state, target_action])[0][0]
reward += self.gamma * future_reward
self.critic_model.fit([cur_state, action], reward, verbose=0)
def train(self):
batch_size = 32
if len(self.memory) < batch_size:
return
rewards = []
samples = random.sample(self.memory, batch_size)
self._train_critic(samples)
self._train_actor(samples)
# ========================================================================= #
# Target Model Updating #
# ========================================================================= #
def _update_actor_target(self):
actor_model_weights = self.actor_model.get_weights()
actor_target_weights = self.target_critic_model.get_weights()
for i in range(len(actor_target_weights)):
actor_target_weights[i] = actor_model_weights[i]
self.target_critic_model.set_weights(actor_target_weights)
def _update_critic_target(self):
critic_model_weights = self.critic_model.get_weights()
critic_target_weights = self.critic_target_model.get_weights()
for i in range(len(critic_target_weights)):
critic_target_weights[i] = critic_model_weights[i]
self.critic_target_model.set_weights(critic_target_weights)
def update_target(self):
self._update_actor_target()
self._update_critic_target()
# ========================================================================= #
# Model Predictions #
# ========================================================================= #
def act(self, cur_state):
self.epsilon *= self.epsilon_decay
if np.random.random() < self.epsilon:
return self.env.action_space.sample()
return self.actor_model.predict(cur_state)
def main():
sess = tf.Session()
K.backend.set_session(sess)
env = gym.make("Pendulum-v0")
actor_critic = ActorCritic(env, sess)
num_trials = 10000
trial_len = 500
cur_state = env.reset()
action = env.action_space.sample()
total_reward = 0
while True:
#env.render()
cur_state = cur_state.reshape((1, env.observation_space.shape[0]))
action = actor_critic.act(cur_state)
action = action.reshape((1, env.action_space.shape[0]))
new_state, reward, done, _ = env.step(action)
if not done:
total_reward += reward
else:
print(total_reward)
env.reset()
total_reward = 0
new_state = new_state.reshape((1, env.observation_space.shape[0]))
actor_critic.remember(cur_state, action, reward, new_state, done)
actor_critic.train()
cur_state = new_state
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment