Skip to content

Instantly share code, notes, and snippets.

@solaris33
Last active April 8, 2020 09:24
Show Gist options
  • Save solaris33/6482b7c787563ccb6331b070f86e5171 to your computer and use it in GitHub Desktop.
Save solaris33/6482b7c787563ccb6331b070f86e5171 to your computer and use it in GitHub Desktop.
"""
Cartpole Policy Gradient Example using TensorFlow 2.0
Reference : https://github.com/awjuliani/DeepRL-Agents/blob/master/Vanilla-Policy.ipynb
Author : solaris33
Project URL : http://solarisailab.com/archives/2652
"""
import tensorflow as tf
import numpy as np
import gym
# Load cartpole environment
env = gym.make('CartPole-v0')
gamma = 0.99
learning_rate = 0.01
state_size = 4
num_actions = 2
hidden_size = 8
total_episodes = 5000 # Set total number of episodes to train agent on.
max_ep = 999
update_frequency = 5
is_visualize = False
def discount_rewards(r):
""" take 1D float array of rewards and compute discounted reward """
discounted_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(0, r.size)):
running_add = running_add * gamma + r[t]
discounted_r[t] = running_add
return discounted_r
class PolicyNetworks(tf.keras.Model):
def __init__(self):
super(PolicyNetworks, self).__init__()
self.hidden_layer_1 = tf.keras.layers.Dense(hidden_size, activation='relu')
self.output_layer = tf.keras.layers.Dense(num_actions, activation='softmax')
def call(self, x):
H1_output = self.hidden_layer_1(x)
outputs = self.output_layer(H1_output)
return outputs
def pg_loss(outputs, actions, rewards):
indexes = tf.range(0, tf.shape(outputs)[0]) * tf.shape(outputs)[1] + actions
responsible_outputs = tf.gather(tf.reshape(outputs, [-1]), indexes)
loss = -tf.reduce_mean(tf.math.log(responsible_outputs) * rewards)
return loss
optimizer = tf.optimizers.Adam(learning_rate)
def train_step(model, states, actions, rewards):
with tf.GradientTape() as tape:
outputs = model(states)
loss = pg_loss(outputs, actions, rewards)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Declare Policy Gradient Networks
PG_model = PolicyNetworks()
i = 0
total_reward = []
total_length = []
# train start
while i < total_episodes:
s = env.reset()
running_reward = 0
ep_history = []
for j in range(max_ep):
if is_visualize == True:
env.render()
# Probabilistically pick an action given our network outputs.
s = np.expand_dims(s, 0)
a_dist = PG_model(s).numpy()
a = np.random.choice(a_dist[0], p=a_dist[0])
a = np.argmax(a_dist == a)
s1, r, d, _ = env.step(a) # Get reward and next state
ep_history.append([s, a, r, s1])
s = s1
running_reward += r
if d == True:
ep_history = np.array(ep_history)
ep_history[:, 2] = discount_rewards(ep_history[:, 2])
# Make state list to numpy array
np_states = np.array(ep_history[0, 0])
for idx in range(1, ep_history[:, 0].size):
np_states = np.append(np_states, ep_history[idx, 0], axis=0)
# Update the network parameter
if i % update_frequency == 0 and i != 0:
train_step(PG_model, np_states, ep_history[:, 1], ep_history[:, 2])
total_reward.append(running_reward)
total_length.append(j)
break
# Print last 100 episode's mean score
if i % 100 == 0:
print(np.mean(total_reward[-100:]))
i += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment