-
-
Save apicquot/a262e2a10fe567544043b5201134799c to your computer and use it in GitHub Desktop.
a DQN code solving CartPole-v0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Works with Tensorflow 1.2 and Python 3.5 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
from gym import wrappers | |
import tensorflow as tf | |
import numpy as np | |
import random | |
from collections import deque | |
# Hyper Parameters for DQN | |
gamma = 0.9 # discount factor for target Q | |
epsilonMax = 0.50 # starting value of epsilon .5 for carpole | |
epsilonMin = 0.01 # final value of epsilon | |
epsilonN = 10000 # 10000 for cartpole | |
replaySize = 10000 # experience replay buffer size | |
batchSize = 32 # size of minibatch | |
lr = 0.0003 | |
modelSaveFrequency =1000000 | |
hidden_dims = [16,16,16] | |
envName = 'CartPole-v0' | |
# envName = 'CartPole-v1' | |
#envName = 'Acrobot-v1' | |
nEpisodesMax = 2000 # Episode limitation | |
nStepsMax = 200 # Step limitation in an episode | |
testFrequency = 20 | |
nTests = 10 # The number of experiment test every 100 episode | |
class AgentDQN(): | |
def __init__(self, env): | |
# init experience replay | |
self.memory = deque(maxlen=replaySize) | |
# init some parameters | |
self.stepIdx = 0 | |
self.lr = lr | |
self.epsilon = epsilonMax | |
self.state_dim = env.observation_space.shape[0] | |
self.action_dim = env.action_space.n | |
self.create_Q_network() | |
self.create_training_method() | |
# Init session | |
self.session = tf.InteractiveSession() | |
self.session.run(tf.global_variables_initializer()) | |
# loading networks | |
self.saver = tf.train.Saver() | |
checkpoint = tf.train.get_checkpoint_state("saved_networks") | |
if checkpoint and checkpoint.model_checkpoint_path: | |
self.saver.restore(self.session, checkpoint.model_checkpoint_path) | |
print ("Successfully loaded:", checkpoint.model_checkpoint_path) | |
else: | |
print("Could not find old network weights") | |
global summary_writer | |
summary_writer = tf.summary.FileWriter('tensorboard/', graph=self.session.graph) | |
def create_Q_network(self): | |
# input layer | |
self.state_input = tf.placeholder("float", [None, self.state_dim]) | |
if False: | |
#original creation of the network - no longer used | |
# network weights | |
hidden_dim = 50 | |
W1 = self.weight_variable([self.state_dim, hidden_dim]) | |
b1 = self.bias_variable([hidden_dim]) | |
W2 = self.weight_variable([hidden_dim, self.action_dim]) | |
b2 = self.bias_variable([self.action_dim]) | |
# hidden layers | |
h_layer = tf.nn.relu(tf.matmul(self.state_input, W1) + b1) | |
# Q Value layer | |
self.Q_value = tf.matmul(h_layer, W2) + b2 | |
if True: #creation using layers much easier - allows easy config multi hidden layers | |
hidden = self.state_input | |
for dim in hidden_dims: | |
hidden = tf.layers.dense(hidden, dim, activation=tf.nn.relu) | |
# finale layer is linear | |
self.Q_value = tf.layers.dense(hidden, self.action_dim, activation=None) | |
def create_training_method(self): | |
self.action_input = tf.placeholder(shape=[None, self.action_dim],dtype=tf.float32) # one hot presentation | |
self.y_input = tf.placeholder(shape=[None],dtype=tf.float32) | |
Q_action = tf.reduce_sum(tf.multiply(self.Q_value, self.action_input), reduction_indices=1) | |
self.loss = tf.reduce_mean(tf.square(self.y_input - Q_action)) | |
tf.summary.scalar("loss", self.loss) | |
global merged_summary_op | |
merged_summary_op = tf.summary.merge_all() | |
self.optimizer = tf.train.AdamOptimizer(self.lr).minimize(self.loss) | |
self.action_holder = tf.argmax(self.action_input,axis=1,output_type=tf.int32) | |
Q_shape = tf.shape(self.Q_value,out_type=tf.int32) | |
Q_indexes = tf.range(0, Q_shape[0]) * Q_shape[1] + self.action_holder | |
# Q_action3 = tf.gather(tf.reshape(self.Q_value, [-1]), Q_indexes) | |
# self.loss3 = tf.reduce_mean(tf.square(tf.subtract(self.y_input,Q_action3))) | |
# self.optimizer3 = tf.train.AdamOptimizer(self.lr).minimize(self.loss3) | |
self.next_state_input = tf.placeholder("float", [None, self.state_dim]) | |
def perceive(self, state, action, reward, next_state, done): | |
one_hot_action = np.zeros(self.action_dim) | |
one_hot_action[action] = 1 | |
self.memory.append((state, one_hot_action, reward, next_state, done)) | |
if len(self.memory) > batchSize: | |
self.train_Q_network() | |
def train_Q_network(self): | |
self.stepIdx += 1 | |
# Step 1: obtain random minibatch from replay memory | |
minibatch = random.sample(self.memory, batchSize) | |
state_batch = [data[0] for data in minibatch] | |
action_batch = [data[1] for data in minibatch] | |
reward_batch = [data[2] for data in minibatch] | |
next_state_batch = [data[3] for data in minibatch] | |
# Step 2: calculate y | |
y_batch = [] | |
Q_value_batch = self.Q_value.eval(feed_dict={self.state_input: next_state_batch}) | |
for i in range(0, batchSize): | |
done = minibatch[i][4] | |
if done: | |
y_batch.append(reward_batch[i]) | |
else: | |
y_batch.append(reward_batch[i] + gamma * np.max(Q_value_batch[i])) | |
self.optimizer.run(feed_dict={ | |
self.y_input: y_batch, | |
self.action_input: action_batch, | |
self.state_input: state_batch | |
}) | |
summary_str = self.session.run(merged_summary_op, feed_dict={ | |
self.y_input: y_batch, | |
self.action_input: action_batch, | |
self.state_input: state_batch | |
}) | |
summary_writer.add_summary(summary_str, self.stepIdx) | |
# save network every 1000 iteration | |
if (self.stepIdx+1) % modelSaveFrequency == 0: | |
self.saver.save(self.session, 'saved_networks/' + 'network' + '-dqn', global_step=self.stepIdx) | |
def egreedy_action(self, state): | |
Q_value = self.Q_value.eval(feed_dict={ \ | |
self.state_input: [state] \ | |
})[0] | |
self.epsilon = max(epsilonMin,epsilonMax + (epsilonMin-epsilonMax) * self.stepIdx / epsilonN) | |
if random.random() <= self.epsilon: | |
return random.randint(0, self.action_dim - 1) | |
else: | |
return np.argmax(Q_value) | |
""" | |
def boltzman_action(self,state): | |
"" " | |
Sample from categorical distribution, | |
specified by a vector of class probabilities | |
boltzmann , better than e-greedy | |
"" " | |
Q_value = self.Q_value.eval(feed_dict = {\ | |
self.state_input:[state] \ | |
})[0] | |
prob_n = np.asarray(Q_value) | |
csprob_n = np.cumsum(prob_n) | |
return (csprob_n > np.random.rand()).argmax() | |
""" | |
def action(self, state): | |
return np.argmax(self.Q_value.eval(feed_dict={ | |
self.state_input: [state] | |
})[0]) | |
def weight_variable(self, shape): | |
initial = tf.truncated_normal(shape) | |
return tf.Variable(initial) | |
def bias_variable(self, shape): | |
initial = tf.constant(0.01, shape=shape) | |
return tf.Variable(initial) | |
def main(): | |
# initialize OpenAI Gym env and dqn agent | |
env = gym.make(envName) | |
# set seeds to 0 | |
random.seed(0) | |
np.random.seed(0) | |
env.seed(0) | |
tf.set_random_seed(0) | |
agent = AgentDQN(env) | |
# record results | |
#env = wrappers.Monitor(env, "./tmp/CartPole-v0-dqn") | |
for episodeIdx in range(nEpisodesMax): | |
# initialize task | |
state = env.reset() | |
# Train | |
for step in range(nStepsMax): | |
action = agent.egreedy_action(state) # e-greedy action for train | |
next_state, reward, done, _ = env.step(action) | |
agent.perceive(state, action, reward, next_state, done) | |
state = next_state | |
if done: | |
break | |
if (episodeIdx+1) % testFrequency == 0: | |
total_reward = 0 | |
for i in range(nTests): | |
state = env.reset() | |
for j in range(nStepsMax): | |
env.render() | |
action = agent.action(state) # direct action for test | |
state, reward, done, _ = env.step(action) | |
total_reward += reward | |
if done: | |
break | |
ave_reward = total_reward / nTests | |
print('episode: ', episodeIdx, 'Evaluation Average Reward:', ave_reward) | |
print('epsilon: {}'.format(agent.epsilon)) | |
# if ave_reward >= 200: | |
# break | |
# upload results and make a gist | |
env.close() | |
# gym.upload | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
syntax changes to make it work on tensorflow>1.0 and Python 3