Created
February 1, 2023 16:59
-
-
Save Deewens/8f3f3b0ea12dff3190b2742ada3ad43d to your computer and use it in GitHub Desktop.
Double DQN with CNN in Pong
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def create_q_model(num_actions): | |
# Network defined by the Deepmind paper | |
inputs = layers.Input(shape=(4, 84, 84, 1)) | |
# Convolutions on the frames on the screen | |
layer1 = layers.Conv2D(32, 8, strides=4, activation="relu")(inputs) | |
layer2 = layers.Conv2D(64, 4, strides=2, activation="relu")(layer1) | |
layer3 = layers.Conv2D(64, 3, strides=1, activation="relu")(layer2) | |
layer4 = layers.Flatten()(layer3) | |
layer5 = layers.Dense(512, activation="relu")(layer4) | |
action = layers.Dense(num_actions, activation="linear")(layer5) | |
return tf.keras.Model(inputs=inputs, outputs=action) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DoubleDQNAgent(): | |
def __init__(self, env): | |
self.env = env | |
# Constants | |
self.NUM_ACTIONS = self.env.action_space.n | |
self.DISCOUNT_FACTOR = 0.99 # Discount factor gamma used in the Q-learning update | |
self.REPLAY_START_SIZE = 5000 # The agent is run for this number of steps before the training start. The resulting experience is used to populate the replay memory | |
# self.REPLAY_START_SIZE = 50 | |
self.FINAL_EXPLORATION_STEP = 100000 # Number of frames over which the initial value of epsilon is linearly annealed to its final value. | |
self.INITIAL_EXPLORATION = 1.0 # Initial value of epsilon in Epsilon-Greedy exploration | |
self.FINAL_EXPLORATION = 0.1 # Final value of epsilon in Epsilon-Greedy exploration | |
self.REPLAY_MEMORY_SIZE = 100000 | |
self.MINIBATCH_SIZE = 32 | |
self.TARGET_NETWORK_UPDATE_FREQUENCY = 1000 # The frequency with which the tqrget netzork is updqted (measured in the number of parameter updates) | |
self.LEARNING_RATE = 0.00025 | |
self.buffer = ReplayMemory(self.REPLAY_MEMORY_SIZE) | |
self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.LEARNING_RATE, clipnorm=1.0) | |
self.loss_function = tf.keras.losses.MeanSquaredError() | |
self.model = create_q_model(self.NUM_ACTIONS) | |
self.target_model = create_q_model(self.NUM_ACTIONS) | |
self.target_model.set_weights(self.model.get_weights()) | |
#self.EPSILON_INTERVAL = (1.0 -self.FINAL_EXPLORATION) | |
#self.EPSILON_DECAY_FACTOR = 0.99 | |
# self.running_reward = 0 | |
# self.episode_reward_history = [] | |
# self.max_episodes = 10000 | |
# self.max_step_per_episodes = 100 | |
def train(self): | |
epsilon = self.INITIAL_EXPLORATION | |
episode_idx = 0 | |
timestep = 0 | |
while True: | |
episode_idx += 1 | |
episode_reward = 0 | |
done = False | |
state, info = self.env.reset() | |
while not done: | |
timestep += 1 | |
action = self.choose_action(state, epsilon) | |
# Reduce epsilon if training started | |
if timestep > self.REPLAY_START_SIZE: | |
epsilon -= (self.INITIAL_EXPLORATION - self.FINAL_EXPLORATION) / self.FINAL_EXPLORATION_STEP | |
epsilon = max(epsilon, self.FINAL_EXPLORATION) | |
next_state, reward, terminated, truncated, info = self.env.step(action) | |
done = terminated or truncated | |
episode_reward += reward | |
self.buffer.append(Experience(state, action, reward, done, next_state)) | |
# Only train if done observing (buffer has been filled enough) | |
if timestep > self.REPLAY_START_SIZE: | |
states_sample, actions_sample, rewards_sample, dones_sample, next_states_sample = self.buffer.sample(self.MINIBATCH_SIZE) | |
states_sample_tensor = tf.convert_to_tensor(states_sample) | |
next_states_sample_tensor = tf.convert_to_tensor(next_states_sample) | |
actions_sample_tensor = tf.convert_to_tensor(actions_sample) | |
# Perform experience replay | |
# Predict the target q value from the next sample sates | |
target_q_values = self.target_model(next_states_sample_tensor) | |
# Calculate the target q values by discounting the discount rate from the Q Value predicted by the target model | |
# (1 - minibatch.done) will be 0 if this is the terminated state, and thus, won't update the q_learning target (because 0 * x = 0) | |
# reduce_max get the maximum Q_value for each list of q_values returned by the target model (because we gave a batch of 32 states to the model) | |
target_q_values = rewards_sample + (1 - dones_sample) * self.DISCOUNT_FACTOR * tf.reduce_max(target_q_values, axis=1) | |
# Create a mask on the action stores in the sampled minibatch | |
# This allows us to only calculate the loss on the updated Q-values | |
# WHAT IS A ONE_HOT Tensor? | |
# A one hot tensor is a matrix representation of a categorical variable, | |
# where the matrix has a single 1 in the column corresponding to the category and all other entries are 0. | |
# In other words, a one-hot tensor is a vector of length equal to the number of categories with a single 1 in the position corresponding to the category and all other values as 0. | |
masks = tf.one_hot(actions_sample_tensor, self.NUM_ACTIONS) | |
# Now, we need to calculate the gardient descent, using GradientTape to record the operation made during the training of the Q Function network (main model) | |
# As stated above, GradientTape just record the operation made inside it, such as model training or calculation | |
with tf.GradientTape() as tape: | |
# We train the main network and record the training into the tape | |
q_values = self.model(states_sample_tensor) | |
# Apply the masks to the Q-values to get the Q-value only for taken action from the minibatch | |
masked_q_values = tf.reduce_sum(tf.multiply(q_values, masks), axis=1) | |
loss = self.loss_function(target_q_values, masked_q_values) | |
# We can then performed the back propagation on te taped operation made while training the network | |
# Backpropagation | |
gradients = tape.gradient(loss, self.model.trainable_variables) | |
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables)) | |
if timestep % self.TARGET_NETWORK_UPDATE_FREQUENCY == 0: | |
# update the the target network with weights from the main network | |
self.target_model.set_weights(self.model.get_weights()) | |
print("Target network updated") | |
# Save model every 10,000 iterations | |
if timestep % 10000 == 0: | |
print("Saving model...") | |
self.model.save('pong-model.h5', overwrite=True) | |
print("Timestep: {}, Epsilon: {}, Reward: {}, Loss: {}".format(timestep, epsilon, reward, loss)) | |
print("Episode {} finished!".format(episode_idx)) | |
print("Episode reward: {}".format(episode_reward)) | |
print("******************") | |
def choose_action(self, state, epsilon): | |
# Use epsilon greedy policy to select actions. | |
if np.random.random() < epsilon: | |
action = self.env.action_space.sample() | |
else: | |
# Predict action Q-values | |
state_tensor = tf.convert_to_tensor(state) # Convert the state numpy array to a tensor array because tensorflow only accept tensor | |
state_tensor = tf.expand_dims(state_tensor, 0) # I don't understand what this is for | |
q_values = self.model(state_tensor, training=False) # Call the model to predict the Q-Value according to the passed state | |
# Take best action from the returned q_values | |
# tf.argmax return the index with the largest q_values | |
action = tf.argmax(q_values[0]).numpy() # convert it back to np because it returns a Tensor | |
return action |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Experience = namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state']) | |
class ReplayMemory(): | |
def __init__(self, capacity): | |
self.buffer = deque(maxlen=capacity) | |
def __len__(self): | |
return len(self.buffer) | |
def append(self, experience): | |
self.buffer.append(experience) | |
def sample(self, batch_size=32): | |
indices = np.random.choice(len(self.buffer), batch_size, replace=False) | |
states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices]) | |
return np.array(states), np.array(actions), np.array(rewards), np.array(dones), np.array(next_states) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment