|  | import json | 
        
          |  | import numpy as np | 
        
          |  | from keras.models import Sequential | 
        
          |  | from keras.layers.core import Dense | 
        
          |  | from keras.optimizers import sgd | 
        
          |  |  | 
        
          |  |  | 
        
          |  | class Catch(object): | 
        
          |  | def __init__(self, grid_size=10): | 
        
          |  | self.grid_size = grid_size | 
        
          |  | self.reset() | 
        
          |  |  | 
        
          |  | def _update_state(self, action): | 
        
          |  | """ | 
        
          |  | Input: action and states | 
        
          |  | Ouput: new states and reward | 
        
          |  | """ | 
        
          |  | state = self.state | 
        
          |  | if action == 0:  # left | 
        
          |  | action = -1 | 
        
          |  | elif action == 1:  # stay | 
        
          |  | action = 0 | 
        
          |  | else: | 
        
          |  | action = 1  # right | 
        
          |  | f0, f1, basket = state[0] | 
        
          |  | new_basket = min(max(1, basket + action), self.grid_size-1) | 
        
          |  | f0 += 1 | 
        
          |  | out = np.asarray([f0, f1, new_basket]) | 
        
          |  | out = out[np.newaxis] | 
        
          |  |  | 
        
          |  | assert len(out.shape) == 2 | 
        
          |  | self.state = out | 
        
          |  |  | 
        
          |  | def _draw_state(self): | 
        
          |  | im_size = (self.grid_size,)*2 | 
        
          |  | state = self.state[0] | 
        
          |  | canvas = np.zeros(im_size) | 
        
          |  | canvas[state[0], state[1]] = 1  # draw fruit | 
        
          |  | canvas[-1, state[2]-1:state[2] + 2] = 1  # draw basket | 
        
          |  | return canvas | 
        
          |  |  | 
        
          |  | def _get_reward(self): | 
        
          |  | fruit_row, fruit_col, basket = self.state[0] | 
        
          |  | if fruit_row == self.grid_size-1: | 
        
          |  | if abs(fruit_col - basket) <= 1: | 
        
          |  | return 1 | 
        
          |  | else: | 
        
          |  | return -1 | 
        
          |  | else: | 
        
          |  | return 0 | 
        
          |  |  | 
        
          |  | def _is_over(self): | 
        
          |  | if self.state[0, 0] == self.grid_size-1: | 
        
          |  | return True | 
        
          |  | else: | 
        
          |  | return False | 
        
          |  |  | 
        
          |  | def observe(self): | 
        
          |  | canvas = self._draw_state() | 
        
          |  | return canvas.reshape((1, -1)) | 
        
          |  |  | 
        
          |  | def act(self, action): | 
        
          |  | self._update_state(action) | 
        
          |  | reward = self._get_reward() | 
        
          |  | game_over = self._is_over() | 
        
          |  | return self.observe(), reward, game_over | 
        
          |  |  | 
        
          |  | def reset(self): | 
        
          |  | n = np.random.randint(0, self.grid_size-1, size=1) | 
        
          |  | m = np.random.randint(1, self.grid_size-2, size=1) | 
        
          |  | self.state = np.asarray([0, n, m])[np.newaxis] | 
        
          |  |  | 
        
          |  |  | 
        
          |  | class ExperienceReplay(object): | 
        
          |  | def __init__(self, max_memory=100, discount=.9): | 
        
          |  | self.max_memory = max_memory | 
        
          |  | self.memory = list() | 
        
          |  | self.discount = discount | 
        
          |  |  | 
        
          |  | def remember(self, states, game_over): | 
        
          |  | # memory[i] = [[state_t, action_t, reward_t, state_t+1], game_over?] | 
        
          |  | self.memory.append([states, game_over]) | 
        
          |  | if len(self.memory) > self.max_memory: | 
        
          |  | del self.memory[0] | 
        
          |  |  | 
        
          |  | def get_batch(self, model, batch_size=10): | 
        
          |  | len_memory = len(self.memory) | 
        
          |  | num_actions = model.output_shape[-1] | 
        
          |  | env_dim = self.memory[0][0][0].shape[1] | 
        
          |  | inputs = np.zeros((min(len_memory, batch_size), env_dim)) | 
        
          |  | targets = np.zeros((inputs.shape[0], num_actions)) | 
        
          |  | for i, idx in enumerate(np.random.randint(0, len_memory, | 
        
          |  | size=inputs.shape[0])): | 
        
          |  | state_t, action_t, reward_t, state_tp1 = self.memory[idx][0] | 
        
          |  | game_over = self.memory[idx][1] | 
        
          |  |  | 
        
          |  | inputs[i:i+1] = state_t | 
        
          |  | # There should be no target values for actions not taken. | 
        
          |  | # Thou shalt not correct actions not taken #deep | 
        
          |  | targets[i] = model.predict(state_t)[0] | 
        
          |  | Q_sa = np.max(model.predict(state_tp1)[0]) | 
        
          |  | if game_over:  # if game_over is True | 
        
          |  | targets[i, action_t] = reward_t | 
        
          |  | else: | 
        
          |  | # reward_t + gamma * max_a' Q(s', a') | 
        
          |  | targets[i, action_t] = reward_t + self.discount * Q_sa | 
        
          |  | return inputs, targets | 
        
          |  |  | 
        
          |  |  | 
        
          |  | if __name__ == "__main__": | 
        
          |  | # parameters | 
        
          |  | epsilon = .1  # exploration | 
        
          |  | num_actions = 3  # [move_left, stay, move_right] | 
        
          |  | epoch = 1000 | 
        
          |  | max_memory = 500 | 
        
          |  | hidden_size = 100 | 
        
          |  | batch_size = 50 | 
        
          |  | grid_size = 10 | 
        
          |  |  | 
        
          |  | model = Sequential() | 
        
          |  | model.add(Dense(hidden_size, input_shape=(grid_size**2,), activation='relu')) | 
        
          |  | model.add(Dense(hidden_size, activation='relu')) | 
        
          |  | model.add(Dense(num_actions)) | 
        
          |  | model.compile(sgd(lr=.2), "mse") | 
        
          |  |  | 
        
          |  | # If you want to continue training from a previous model, just uncomment the line bellow | 
        
          |  | # model.load_weights("model.h5") | 
        
          |  |  | 
        
          |  | # Define environment/game | 
        
          |  | env = Catch(grid_size) | 
        
          |  |  | 
        
          |  | # Initialize experience replay object | 
        
          |  | exp_replay = ExperienceReplay(max_memory=max_memory) | 
        
          |  |  | 
        
          |  | # Train | 
        
          |  | win_cnt = 0 | 
        
          |  | for e in range(epoch): | 
        
          |  | loss = 0. | 
        
          |  | env.reset() | 
        
          |  | game_over = False | 
        
          |  | # get initial input | 
        
          |  | input_t = env.observe() | 
        
          |  |  | 
        
          |  | while not game_over: | 
        
          |  | input_tm1 = input_t | 
        
          |  | # get next action | 
        
          |  | if np.random.rand() <= epsilon: | 
        
          |  | action = np.random.randint(0, num_actions, size=1) | 
        
          |  | else: | 
        
          |  | q = model.predict(input_tm1) | 
        
          |  | action = np.argmax(q[0]) | 
        
          |  |  | 
        
          |  | # apply action, get rewards and new state | 
        
          |  | input_t, reward, game_over = env.act(action) | 
        
          |  | if reward == 1: | 
        
          |  | win_cnt += 1 | 
        
          |  |  | 
        
          |  | # store experience | 
        
          |  | exp_replay.remember([input_tm1, action, reward, input_t], game_over) | 
        
          |  |  | 
        
          |  | # adapt model | 
        
          |  | inputs, targets = exp_replay.get_batch(model, batch_size=batch_size) | 
        
          |  |  | 
        
          |  | loss += model.train_on_batch(inputs, targets)[0] | 
        
          |  | print("Epoch {:03d}/999 | Loss {:.4f} | Win count {}".format(e, loss, win_cnt)) | 
        
          |  |  | 
        
          |  | # Save trained model weights and architecture, this will be used by the visualization code | 
        
          |  | model.save_weights("model.h5", overwrite=True) | 
        
          |  | with open("model.json", "w") as outfile: | 
        
          |  | json.dump(model.to_json(), outfile) |