Created
July 17, 2017 00:27
-
-
Save MakMukhi/8fb24bdbb9bee3ac6bebe950a2aef357 to your computer and use it in GitHub Desktop.
NeuralNet to learn and play Atari pong based on Q-learning.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import _pickle as pickle | |
| import gym | |
| # Fully connected NeuralNet with minimum two layers. | |
| # This is a simple implementation with no bias parameters. | |
| # To work with RL this neural net is such that it feedforwards | |
| # for only example at a time but backprops a batch. | |
| class NeuralNet: | |
| def __init__(self, | |
| input_features, | |
| output_classes, | |
| units_per_layer, | |
| num_of_layers, | |
| alpha=0.001): | |
| self.model = [] | |
| w1 = np.random.randn(input_features, | |
| units_per_layer) / np.sqrt(input_features) | |
| wn = np.random.randn(units_per_layer, | |
| output_classes) / np.sqrt(units_per_layer) | |
| self.model.append(w1) | |
| for i in range(num_of_layers - 2): | |
| self.model.append( | |
| np.random.randn(units_per_layer, units_per_layer) / | |
| np.sqrt(units_per_layer)) | |
| self.model.append(wn) | |
| self.alpha = alpha | |
| def relu(self, x): | |
| x[x < 0] = 0 | |
| return x | |
| # x1 = relu(x0*w1) | |
| # x2 = relu(x1*w2) | |
| # ... | |
| # xout = sigmoid(xn-1*wn) | |
| # expects input to be [1,n] shape (one row and n cols). | |
| def feed_forward(self, input): | |
| # returns the hypothesis for the current input | |
| # and a list of inputs to each layer, which will | |
| # be necessary for backprop later. | |
| input_to_layer = [] | |
| out = input | |
| input_to_layer.append(out) | |
| for idx, weight in enumerate(self.model): | |
| if idx == len(self.model) - 1: | |
| break | |
| out = self.relu(np.dot(out, weight)) | |
| input_to_layer.append(out) | |
| return np.dot(out, self.model[len(self.model) - 1]), input_to_layer | |
| # Loss = -alpha * 1/2 * (Gt - Vt)^2 for episode t | |
| # differentiation(loss) = (Gt-Vt)differentiation(Vt) | |
| # let error = Gt - Vt | |
| # gradient_n = hidden_state_n-1.T*error | |
| # gradient_n-1 = hidden_state_n-2.T*error*Wn.T | |
| # gradient_n-2 = hidden_state_n-3.T*error*Wn.T*Wn-1.T | |
| # ... | |
| # gradient_1 = input.T*error*Wn.T*Wn-1.T*...*W2.T | |
| def back_prop(self, error, input_to_layer): | |
| gradients = [] | |
| delta = error | |
| for n in reversed(range(len(self.model))): | |
| gradients.append(np.dot(input_to_layer[n].T, delta)) | |
| # update delta for the previous layer. | |
| delta = np.dot(delta, self.model[n].T) | |
| delta[input_to_layer[n] <= 0] = 0 # backprop relu. | |
| return reversed(gradients) | |
| def update_model(self, gradients): | |
| for idx, gr in enumerate(gradients): | |
| self.model[idx] += self.alpha*gr | |
| # End of NeuralNet | |
| class AgentPong: | |
| def __init__(self): | |
| self.num_input_features = 80*80 | |
| self.num_actions = 3 | |
| self.num_hidden_units = 200 | |
| self.num_of_layers = 2 | |
| self.gamma = 0.99 | |
| self.epsilon = 0.3 | |
| self.actions = [1,2,3] | |
| self.nn = NeuralNet(self.num_input_features, self.num_actions, self.num_hidden_units, self.num_of_layers) | |
| # input_to_layer is a list of size num_of_layers, each element of which is a | |
| # 2D matrix of size num_time_steps*num_hidden_units (except for the 1st | |
| # element where the size is num_time_steps*num_input_features) | |
| self.input_to_layer = [None] * self.num_of_layers | |
| self.error = None | |
| self.prev_frames = np.zeros([4, 1, self.num_input_features]) | |
| def preprocess(self, frame): | |
| # Preprocess code is same as Andrej Karpathy's blog: https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5. | |
| frame = frame[35:195] # Get only the relevant part of the frame. | |
| frame = frame[::2, ::2, 0] # take every 2nd entry from first two dimensions and the first entry from the 3rd dimension. | |
| frame[frame == 144] = 0 # Remove background. | |
| frame[frame == 109] = 0 # Remove background. | |
| frame[frame!=0] = 1 # Every illumination on the frame (paddles and ball) is set to 1. | |
| return frame.astype(np.float).ravel() | |
| def take_action(self, frame): | |
| curr_frame = self.preprocess(frame) | |
| curr_frame = np.reshape(curr_frame, [1, self.num_input_features]) | |
| for i in range(self.prev_frames.shape[0] - 1): | |
| self.prev_frames[i] = self.prev_frames[i+1] | |
| self.prev_frames[-1] = curr_frame | |
| x0 = np.sum(self.prev_frames, axis=0) | |
| action_vals, hs = self.nn.feed_forward(x0) | |
| for idx, inp in enumerate(hs): | |
| # inp is of shape [1,num_hidden_units] except for the 1st one which is | |
| # [1, num_input_features] | |
| if self.input_to_layer[idx] is not None: | |
| self.input_to_layer[idx] = np.vstack((self.input_to_layer[idx], inp)) | |
| else: | |
| self.input_to_layer[idx] = inp | |
| # action is a matrix of size [1, num_output_action] | |
| # in case of pong num_output_action = 2 | |
| # With a probability of epsilon choose an action uniformly at random | |
| action_idx = None | |
| # print("Action vals: ", action_vals) | |
| if np.random.uniform() <= self.epsilon: | |
| # print("Will explore this time") | |
| action_idx = np.random.choice(len(self.actions)) | |
| else: | |
| # print("Will choose the optimal action") | |
| action_idx = np.argmax(action_vals) | |
| # since W = W + (Gt - Vt) * differentiation(Vt) | |
| # We consider (Gt - Vt) to be the error | |
| # Gt is the Reward received at the end discounted over episodes (Monte Carlo algorithm) | |
| er = np.zeros(action_vals.shape) # error for all not selected action is 0. | |
| er[0][action_idx] = -action_vals[0][action_idx] # so later when we just add Gt to it. | |
| if self.error is not None: | |
| self.error = np.vstack((self.error, er)) | |
| else: | |
| self.error = er | |
| return self.actions[action_idx] | |
| def get_reward(self, r, done): | |
| # A game of openai pong is made up of 21 plays. | |
| # We will consider each play to be an episode. | |
| # Each play ends with either side missing the ball. | |
| # At the end of the play we get a reward. | |
| # We'll account for all action in a play for that reward. | |
| # Also, we'll update parameters after every play. | |
| # When the game finishes we reset the previous frame. | |
| if r == 0: | |
| sys.exit("Agent pong doesn't accept a zero-valued reward") | |
| for idx in reversed(range(self.error.shape[0])): | |
| # print("reward for step %d is %f" %(idx, r)) | |
| self.error[idx][self.error[idx] != 0] += r | |
| r *= self.gamma | |
| # print("Errors for step ", idx, " of this episode:", self.error[idx]) | |
| loss = np.sum(self.error**2) | |
| self.nn.update_model(self.nn.back_prop(self.error, self.input_to_layer)) | |
| self.error = None | |
| self.input_to_layer = [None] * self.num_of_layers | |
| if done: | |
| self.prev_frames = np.zeros([4, 1, self.num_input_features]) | |
| return loss | |
| # End of AgentPong. | |
| ap = AgentPong() | |
| # I realize it's not good practice to access the internals | |
| # of the ap(and eventually nn) object and that ideally it | |
| # it should have been a function call on those objects. | |
| # But I'm not focusing on that right now. | |
| ap.nn.model = pickle.load(open('save.p', 'rb')) | |
| losses = [] | |
| env = gym.make("Pong-v0") | |
| observation = env.reset() | |
| play = 1 | |
| while True: | |
| #input() | |
| #env.render() | |
| action = ap.take_action(observation) | |
| # print("action chosen: %d" %(action)) | |
| observation, reward, done, info = env.step(action) | |
| # print("reward received: %f" %(reward)) | |
| if reward != 0: | |
| print ("Play: %d, reward: %f" %(play, reward)) | |
| losses.append(ap.get_reward(reward, done)) | |
| if (play % 1000 == 0): | |
| # print(ap.nn.model) | |
| pickle.dump(ap.nn.model, open('save.p', 'wb')) | |
| pickle.dump(losses, open('loss.p', 'wb')) | |
| losses = [] | |
| play += 1 | |
| if done: | |
| observation = env.reset() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here's a semi-trained agent playing( on the right). Video link