Created
May 8, 2018 13:27
-
-
Save Madhivarman/bf24bb40d01f09944d3d1bf58e83f4c4 to your computer and use it in GitHub Desktop.
A Neural Network model that learns to play a PONG game from the image RAW pixels.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
env = gym.make("Pong-v0") | |
observation = env.reset() | |
#hyperparameters | |
episode_number = 0 | |
batch_size=10 #how many episodes to wait before moving the weights | |
gamma = 0.99 #discount factor for reward | |
decay_rate = 0.99 | |
hidden_layer_neurons = 200 | |
input_dimensions = 80*80 | |
learning_rate = 1e-4 | |
reward_sum = 0 | |
running_reward=None | |
prev_processed_observations=None | |
""" | |
To preprocess the OpenGym to train a neural network | |
1. Crop the image | |
2. Downsample the image | |
3. Convert image into black and white | |
4. Remove the background | |
5. convert 80x80 matrix of values into 6400x1 matrix | |
6. store the difference between current frame and previous frame | |
""" | |
def downsample(img): | |
return img[::2, ::2, :] | |
def remove_color(img): | |
#convert all color into black and white | |
return img[:,:,0] #RGB is Third dimension | |
def remove_background(img): | |
img[img == 144] = 0 | |
img[img == 109] = 0 | |
return img | |
def relu(vector): | |
vector[vector < 0] = 0 | |
return vector | |
def sigmoid(x): | |
return 1.0/(1.0+np.exp(-x)) | |
def apply_neural_nets(observation_matrix, weights): | |
""" | |
Based on the observation_matrix and weights, compute the new hidden layer | |
""" | |
hidden_layer_values = np.dot(weights['1'],observation_matrix) | |
hidden_layer_values = relu(hidden_layer_values) | |
output_layer_values = np.dot(hidden_layer_values,weights['2']) | |
output_layer_values = sigmoid(output_layer_values) | |
return hidden_layer_values, output_layer_values | |
def preprocess_observation(input_observation,prev_processed_observation,input_dimensions): | |
processed_observation = input_observation[35:195] #crop the image | |
processed_observation = downsample(processed_observation) | |
processed_observation = remove_color(processed_observation) | |
processed_observation = remove_background(processed_observation) | |
processed_observation[processed_observation != 0] = 1 #everything else (paddles, ball) set 1 | |
processed_observation = processed_observation.astype(np.float).ravel() #convert from 80x80 to 1600x1 | |
#subtract previous frame from current one | |
if prev_processed_observation is not None: | |
input_observation = processed_observation - prev_processed_observation | |
else: | |
input_observation = np.zeros(input_dimensions) | |
#store the previous frame so we can subtract from another frame | |
prev_processed_observations = processed_observation | |
return input_observation, prev_processed_observations | |
def choose_action(probability): | |
random_value = np.random.uniform() | |
if random_value < probability: | |
#signifies up in opengym | |
return 2 | |
else: | |
#signifies low in opengym | |
return 3 | |
def compute_gradient(gradient_log_p,hidden_layer_values,observation_values,weights): | |
delta_L = gradient_log_p | |
dC_dw2 = np.dot(hidden_layer_values.T, delta_L).ravel() | |
delta_l2 = np.outer(delta_L,weights['2']) | |
delta_l2 = relu(delta_l2) | |
dC_dw1 = np.dot(delta_l2.T,observation_values) | |
return{ | |
'1': dC_dw1, | |
'2': dC_dw2 | |
} | |
def discount_rewards(rewards, gamma): | |
discount_rewards = np.zeros_like(rewards) | |
running_add = 0 | |
for t in reversed(range(0,rewards.size)): | |
if rewards[t] != 0: | |
running_add = 0 | |
running_add =running_add * gamma + rewards[t] | |
discount_rewards[t] = running_add | |
return discount_rewards | |
def discount_with_rewards(gradient_log_p, episode_rewards, gamma): | |
#discount the gradient with the normalized rewards | |
discount_episode_rewards = discount_rewards(episode_rewards,gamma) | |
#standardize the rewards | |
discount_episode_rewards -= np.mean(discount_episode_rewards) | |
discount_episode_rewards /= np.std(discount_episode_rewards) | |
return gradient_log_p*discount_episode_rewards | |
def update_weights(weights,expectation_g_squared,g_dict,decay_rate,learning_rate): | |
epsilon = 1e-5 | |
for layer_name in weights.keys(): | |
g = g_dict[layer_name] | |
expectation_g_squared[layer_name] = decay_rate * expectation_g_squared[layer_name] + (1-decay_rate) * g**2 | |
weights[layer_name] += (learning_rate * g)/(np.sqrt(expectation_g_squared[layer_name] + epsilon)) | |
g_dict[layer_name] = np.zeros_like(weights[layer_name]) #reset batch gradient buffer | |
""" | |
Layer 1: Neural Network of input matrix 200x6400 | |
Layer 2: 200x1 represents the output of the matrix | |
""" | |
weights = { | |
'1': np.random.randn(hidden_layer_neurons,input_dimensions)/np.sqrt(input_dimensions), | |
'2': np.random.randn(hidden_layer_neurons)/np.sqrt(hidden_layer_neurons) | |
} | |
#using RMS Propagation Algorithm | |
expectation_g_squared = {} | |
g_dict = {} | |
for layer_name in weights.keys(): | |
expectation_g_squared[layer_name] = np.zeros_like(weights[layer_name]) | |
g_dict[layer_name] = np.zeros_like(weights[layer_name]) | |
episode_hidden_layer,episode_observation, episode_gradient_log_ps,episode_rewards = [],[],[],[] | |
while True: | |
env.render() | |
processed_observations, prev_processed_observations = preprocess_observation(observation,prev_processed_observations,input_dimensions) | |
""" | |
Now we have preprocessed the image and let's now send an observation to the neural networks | |
""" | |
hidden_layer, up_probability = apply_neural_nets(processed_observations,weights) | |
episode_observation.append(processed_observations) | |
episode_hidden_layer.append(up_probability) | |
#now choose the actions | |
action = choose_action(up_probability) | |
#carry out the chosen action | |
observation, reward, done, info = env.step(action) | |
reward_sum += reward | |
episode_rewards.append(reward) | |
""" | |
After one action we don't really have an idea of whether or not this was | |
the right action. | |
""" | |
fake_label = 1 if action == 2 else 0 | |
loss_function_gradient = fake_label - up_probability | |
episode_gradient_log_ps.append(loss_function_gradient) | |
""" | |
Now By Policy Gradient we can figure how the model learned to play. If the model | |
won the round, play more game like this else try and generate less error. | |
""" | |
#an episode finished | |
if done: | |
episode_number += 1 | |
#combine the following values for episode | |
episode_hidden_layer = np.vstack(episode_hidden_layer) | |
episode_observation = np.vstack(episode_observation) | |
episode_gradient_log_ps = np.vstack(episode_gradient_log_ps) | |
episode_rewards = np.vstack(episode_rewards) | |
# Tweak the gradient of the log_ps based on discount rewards | |
episode_gradient_log_ps_discounted = discount_with_rewards(episode_gradient_log_ps,episode_rewards,gamma) | |
gradient = compute_gradient( | |
episode_gradient_log_ps_discounted, | |
episode_hidden_layer, | |
episode_observation, | |
weights | |
) | |
#sum the gradient when we hit the batch size | |
for layer_name in gradient: | |
g_dict[layer_name] += gradient[layer_name] | |
if episode_number % batch_size == 0: | |
update_weights(weights,expectation_g_squared,g_dict,decay_rate,learning_rate) | |
episode_hidden_layer, episode_observation, episode_gradient_log_ps, episode_rewards = [],[],[],[] #reset the values | |
observation = env.reset() | |
running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01 | |
print("Resetting Env. Episode reward total was {} running mean: {}".format(reward_sum,running_reward)) | |
reward_sum = 0 | |
prev_processed_observations = None | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment