Skip to content

Instantly share code, notes, and snippets.

@Deewens
Created February 1, 2023 16:59
Show Gist options
  • Save Deewens/8f3f3b0ea12dff3190b2742ada3ad43d to your computer and use it in GitHub Desktop.
Save Deewens/8f3f3b0ea12dff3190b2742ada3ad43d to your computer and use it in GitHub Desktop.
Double DQN with CNN in Pong
def create_q_model(num_actions):
# Network defined by the Deepmind paper
inputs = layers.Input(shape=(4, 84, 84, 1))
# Convolutions on the frames on the screen
layer1 = layers.Conv2D(32, 8, strides=4, activation="relu")(inputs)
layer2 = layers.Conv2D(64, 4, strides=2, activation="relu")(layer1)
layer3 = layers.Conv2D(64, 3, strides=1, activation="relu")(layer2)
layer4 = layers.Flatten()(layer3)
layer5 = layers.Dense(512, activation="relu")(layer4)
action = layers.Dense(num_actions, activation="linear")(layer5)
return tf.keras.Model(inputs=inputs, outputs=action)
class DoubleDQNAgent():
def __init__(self, env):
self.env = env
# Constants
self.NUM_ACTIONS = self.env.action_space.n
self.DISCOUNT_FACTOR = 0.99 # Discount factor gamma used in the Q-learning update
self.REPLAY_START_SIZE = 5000 # The agent is run for this number of steps before the training start. The resulting experience is used to populate the replay memory
# self.REPLAY_START_SIZE = 50
self.FINAL_EXPLORATION_STEP = 100000 # Number of frames over which the initial value of epsilon is linearly annealed to its final value.
self.INITIAL_EXPLORATION = 1.0 # Initial value of epsilon in Epsilon-Greedy exploration
self.FINAL_EXPLORATION = 0.1 # Final value of epsilon in Epsilon-Greedy exploration
self.REPLAY_MEMORY_SIZE = 100000
self.MINIBATCH_SIZE = 32
self.TARGET_NETWORK_UPDATE_FREQUENCY = 1000 # The frequency with which the tqrget netzork is updqted (measured in the number of parameter updates)
self.LEARNING_RATE = 0.00025
self.buffer = ReplayMemory(self.REPLAY_MEMORY_SIZE)
self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.LEARNING_RATE, clipnorm=1.0)
self.loss_function = tf.keras.losses.MeanSquaredError()
self.model = create_q_model(self.NUM_ACTIONS)
self.target_model = create_q_model(self.NUM_ACTIONS)
self.target_model.set_weights(self.model.get_weights())
#self.EPSILON_INTERVAL = (1.0 -self.FINAL_EXPLORATION)
#self.EPSILON_DECAY_FACTOR = 0.99
# self.running_reward = 0
# self.episode_reward_history = []
# self.max_episodes = 10000
# self.max_step_per_episodes = 100
def train(self):
epsilon = self.INITIAL_EXPLORATION
episode_idx = 0
timestep = 0
while True:
episode_idx += 1
episode_reward = 0
done = False
state, info = self.env.reset()
while not done:
timestep += 1
action = self.choose_action(state, epsilon)
# Reduce epsilon if training started
if timestep > self.REPLAY_START_SIZE:
epsilon -= (self.INITIAL_EXPLORATION - self.FINAL_EXPLORATION) / self.FINAL_EXPLORATION_STEP
epsilon = max(epsilon, self.FINAL_EXPLORATION)
next_state, reward, terminated, truncated, info = self.env.step(action)
done = terminated or truncated
episode_reward += reward
self.buffer.append(Experience(state, action, reward, done, next_state))
# Only train if done observing (buffer has been filled enough)
if timestep > self.REPLAY_START_SIZE:
states_sample, actions_sample, rewards_sample, dones_sample, next_states_sample = self.buffer.sample(self.MINIBATCH_SIZE)
states_sample_tensor = tf.convert_to_tensor(states_sample)
next_states_sample_tensor = tf.convert_to_tensor(next_states_sample)
actions_sample_tensor = tf.convert_to_tensor(actions_sample)
# Perform experience replay
# Predict the target q value from the next sample sates
target_q_values = self.target_model(next_states_sample_tensor)
# Calculate the target q values by discounting the discount rate from the Q Value predicted by the target model
# (1 - minibatch.done) will be 0 if this is the terminated state, and thus, won't update the q_learning target (because 0 * x = 0)
# reduce_max get the maximum Q_value for each list of q_values returned by the target model (because we gave a batch of 32 states to the model)
target_q_values = rewards_sample + (1 - dones_sample) * self.DISCOUNT_FACTOR * tf.reduce_max(target_q_values, axis=1)
# Create a mask on the action stores in the sampled minibatch
# This allows us to only calculate the loss on the updated Q-values
# WHAT IS A ONE_HOT Tensor?
# A one hot tensor is a matrix representation of a categorical variable,
# where the matrix has a single 1 in the column corresponding to the category and all other entries are 0.
# In other words, a one-hot tensor is a vector of length equal to the number of categories with a single 1 in the position corresponding to the category and all other values as 0.
masks = tf.one_hot(actions_sample_tensor, self.NUM_ACTIONS)
# Now, we need to calculate the gardient descent, using GradientTape to record the operation made during the training of the Q Function network (main model)
# As stated above, GradientTape just record the operation made inside it, such as model training or calculation
with tf.GradientTape() as tape:
# We train the main network and record the training into the tape
q_values = self.model(states_sample_tensor)
# Apply the masks to the Q-values to get the Q-value only for taken action from the minibatch
masked_q_values = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
loss = self.loss_function(target_q_values, masked_q_values)
# We can then performed the back propagation on te taped operation made while training the network
# Backpropagation
gradients = tape.gradient(loss, self.model.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
if timestep % self.TARGET_NETWORK_UPDATE_FREQUENCY == 0:
# update the the target network with weights from the main network
self.target_model.set_weights(self.model.get_weights())
print("Target network updated")
# Save model every 10,000 iterations
if timestep % 10000 == 0:
print("Saving model...")
self.model.save('pong-model.h5', overwrite=True)
print("Timestep: {}, Epsilon: {}, Reward: {}, Loss: {}".format(timestep, epsilon, reward, loss))
print("Episode {} finished!".format(episode_idx))
print("Episode reward: {}".format(episode_reward))
print("******************")
def choose_action(self, state, epsilon):
# Use epsilon greedy policy to select actions.
if np.random.random() < epsilon:
action = self.env.action_space.sample()
else:
# Predict action Q-values
state_tensor = tf.convert_to_tensor(state) # Convert the state numpy array to a tensor array because tensorflow only accept tensor
state_tensor = tf.expand_dims(state_tensor, 0) # I don't understand what this is for
q_values = self.model(state_tensor, training=False) # Call the model to predict the Q-Value according to the passed state
# Take best action from the returned q_values
# tf.argmax return the index with the largest q_values
action = tf.argmax(q_values[0]).numpy() # convert it back to np because it returns a Tensor
return action
Experience = namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state'])
class ReplayMemory():
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def __len__(self):
return len(self.buffer)
def append(self, experience):
self.buffer.append(experience)
def sample(self, batch_size=32):
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
return np.array(states), np.array(actions), np.array(rewards), np.array(dones), np.array(next_states)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment