Skip to content

Instantly share code, notes, and snippets.

@MakMukhi
Created July 17, 2017 00:27
Show Gist options
  • Save MakMukhi/8fb24bdbb9bee3ac6bebe950a2aef357 to your computer and use it in GitHub Desktop.
Save MakMukhi/8fb24bdbb9bee3ac6bebe950a2aef357 to your computer and use it in GitHub Desktop.
NeuralNet to learn and play Atari pong based on Q-learning.
import numpy as np
import _pickle as pickle
import gym
# Fully connected NeuralNet with minimum two layers.
# This is a simple implementation with no bias parameters.
# To work with RL this neural net is such that it feedforwards
# for only example at a time but backprops a batch.
class NeuralNet:
def __init__(self,
input_features,
output_classes,
units_per_layer,
num_of_layers,
alpha=0.001):
self.model = []
w1 = np.random.randn(input_features,
units_per_layer) / np.sqrt(input_features)
wn = np.random.randn(units_per_layer,
output_classes) / np.sqrt(units_per_layer)
self.model.append(w1)
for i in range(num_of_layers - 2):
self.model.append(
np.random.randn(units_per_layer, units_per_layer) /
np.sqrt(units_per_layer))
self.model.append(wn)
self.alpha = alpha
def relu(self, x):
x[x < 0] = 0
return x
# x1 = relu(x0*w1)
# x2 = relu(x1*w2)
# ...
# xout = sigmoid(xn-1*wn)
# expects input to be [1,n] shape (one row and n cols).
def feed_forward(self, input):
# returns the hypothesis for the current input
# and a list of inputs to each layer, which will
# be necessary for backprop later.
input_to_layer = []
out = input
input_to_layer.append(out)
for idx, weight in enumerate(self.model):
if idx == len(self.model) - 1:
break
out = self.relu(np.dot(out, weight))
input_to_layer.append(out)
return np.dot(out, self.model[len(self.model) - 1]), input_to_layer
# Loss = -alpha * 1/2 * (Gt - Vt)^2 for episode t
# differentiation(loss) = (Gt-Vt)differentiation(Vt)
# let error = Gt - Vt
# gradient_n = hidden_state_n-1.T*error
# gradient_n-1 = hidden_state_n-2.T*error*Wn.T
# gradient_n-2 = hidden_state_n-3.T*error*Wn.T*Wn-1.T
# ...
# gradient_1 = input.T*error*Wn.T*Wn-1.T*...*W2.T
def back_prop(self, error, input_to_layer):
gradients = []
delta = error
for n in reversed(range(len(self.model))):
gradients.append(np.dot(input_to_layer[n].T, delta))
# update delta for the previous layer.
delta = np.dot(delta, self.model[n].T)
delta[input_to_layer[n] <= 0] = 0 # backprop relu.
return reversed(gradients)
def update_model(self, gradients):
for idx, gr in enumerate(gradients):
self.model[idx] += self.alpha*gr
# End of NeuralNet
class AgentPong:
def __init__(self):
self.num_input_features = 80*80
self.num_actions = 3
self.num_hidden_units = 200
self.num_of_layers = 2
self.gamma = 0.99
self.epsilon = 0.3
self.actions = [1,2,3]
self.nn = NeuralNet(self.num_input_features, self.num_actions, self.num_hidden_units, self.num_of_layers)
# input_to_layer is a list of size num_of_layers, each element of which is a
# 2D matrix of size num_time_steps*num_hidden_units (except for the 1st
# element where the size is num_time_steps*num_input_features)
self.input_to_layer = [None] * self.num_of_layers
self.error = None
self.prev_frames = np.zeros([4, 1, self.num_input_features])
def preprocess(self, frame):
# Preprocess code is same as Andrej Karpathy's blog: https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5.
frame = frame[35:195] # Get only the relevant part of the frame.
frame = frame[::2, ::2, 0] # take every 2nd entry from first two dimensions and the first entry from the 3rd dimension.
frame[frame == 144] = 0 # Remove background.
frame[frame == 109] = 0 # Remove background.
frame[frame!=0] = 1 # Every illumination on the frame (paddles and ball) is set to 1.
return frame.astype(np.float).ravel()
def take_action(self, frame):
curr_frame = self.preprocess(frame)
curr_frame = np.reshape(curr_frame, [1, self.num_input_features])
for i in range(self.prev_frames.shape[0] - 1):
self.prev_frames[i] = self.prev_frames[i+1]
self.prev_frames[-1] = curr_frame
x0 = np.sum(self.prev_frames, axis=0)
action_vals, hs = self.nn.feed_forward(x0)
for idx, inp in enumerate(hs):
# inp is of shape [1,num_hidden_units] except for the 1st one which is
# [1, num_input_features]
if self.input_to_layer[idx] is not None:
self.input_to_layer[idx] = np.vstack((self.input_to_layer[idx], inp))
else:
self.input_to_layer[idx] = inp
# action is a matrix of size [1, num_output_action]
# in case of pong num_output_action = 2
# With a probability of epsilon choose an action uniformly at random
action_idx = None
# print("Action vals: ", action_vals)
if np.random.uniform() <= self.epsilon:
# print("Will explore this time")
action_idx = np.random.choice(len(self.actions))
else:
# print("Will choose the optimal action")
action_idx = np.argmax(action_vals)
# since W = W + (Gt - Vt) * differentiation(Vt)
# We consider (Gt - Vt) to be the error
# Gt is the Reward received at the end discounted over episodes (Monte Carlo algorithm)
er = np.zeros(action_vals.shape) # error for all not selected action is 0.
er[0][action_idx] = -action_vals[0][action_idx] # so later when we just add Gt to it.
if self.error is not None:
self.error = np.vstack((self.error, er))
else:
self.error = er
return self.actions[action_idx]
def get_reward(self, r, done):
# A game of openai pong is made up of 21 plays.
# We will consider each play to be an episode.
# Each play ends with either side missing the ball.
# At the end of the play we get a reward.
# We'll account for all action in a play for that reward.
# Also, we'll update parameters after every play.
# When the game finishes we reset the previous frame.
if r == 0:
sys.exit("Agent pong doesn't accept a zero-valued reward")
for idx in reversed(range(self.error.shape[0])):
# print("reward for step %d is %f" %(idx, r))
self.error[idx][self.error[idx] != 0] += r
r *= self.gamma
# print("Errors for step ", idx, " of this episode:", self.error[idx])
loss = np.sum(self.error**2)
self.nn.update_model(self.nn.back_prop(self.error, self.input_to_layer))
self.error = None
self.input_to_layer = [None] * self.num_of_layers
if done:
self.prev_frames = np.zeros([4, 1, self.num_input_features])
return loss
# End of AgentPong.
ap = AgentPong()
# I realize it's not good practice to access the internals
# of the ap(and eventually nn) object and that ideally it
# it should have been a function call on those objects.
# But I'm not focusing on that right now.
ap.nn.model = pickle.load(open('save.p', 'rb'))
losses = []
env = gym.make("Pong-v0")
observation = env.reset()
play = 1
while True:
#input()
#env.render()
action = ap.take_action(observation)
# print("action chosen: %d" %(action))
observation, reward, done, info = env.step(action)
# print("reward received: %f" %(reward))
if reward != 0:
print ("Play: %d, reward: %f" %(play, reward))
losses.append(ap.get_reward(reward, done))
if (play % 1000 == 0):
# print(ap.nn.model)
pickle.dump(ap.nn.model, open('save.p', 'wb'))
pickle.dump(losses, open('loss.p', 'wb'))
losses = []
play += 1
if done:
observation = env.reset()
@MakMukhi
Copy link
Author

Here's a semi-trained agent playing( on the right). Video link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment