|
import json |
|
import numpy as np |
|
from keras.models import Sequential |
|
from keras.layers.core import Dense |
|
from keras.optimizers import sgd |
|
|
|
|
|
class Catch(object): |
|
def __init__(self, grid_size=10): |
|
self.grid_size = grid_size |
|
self.reset() |
|
|
|
def _update_state(self, action): |
|
""" |
|
Input: action and states |
|
Ouput: new states and reward |
|
""" |
|
state = self.state |
|
if action == 0: # left |
|
action = -1 |
|
elif action == 1: # stay |
|
action = 0 |
|
else: |
|
action = 1 # right |
|
f0, f1, basket = state[0] |
|
new_basket = min(max(1, basket + action), self.grid_size-1) |
|
f0 += 1 |
|
out = np.asarray([f0, f1, new_basket]) |
|
out = out[np.newaxis] |
|
|
|
assert len(out.shape) == 2 |
|
self.state = out |
|
|
|
def _draw_state(self): |
|
im_size = (self.grid_size,)*2 |
|
state = self.state[0] |
|
canvas = np.zeros(im_size) |
|
canvas[state[0], state[1]] = 1 # draw fruit |
|
canvas[-1, state[2]-1:state[2] + 2] = 1 # draw basket |
|
return canvas |
|
|
|
def _get_reward(self): |
|
fruit_row, fruit_col, basket = self.state[0] |
|
if fruit_row == self.grid_size-1: |
|
if abs(fruit_col - basket) <= 1: |
|
return 1 |
|
else: |
|
return -1 |
|
else: |
|
return 0 |
|
|
|
def _is_over(self): |
|
if self.state[0, 0] == self.grid_size-1: |
|
return True |
|
else: |
|
return False |
|
|
|
def observe(self): |
|
canvas = self._draw_state() |
|
return canvas.reshape((1, -1)) |
|
|
|
def act(self, action): |
|
self._update_state(action) |
|
reward = self._get_reward() |
|
game_over = self._is_over() |
|
return self.observe(), reward, game_over |
|
|
|
def reset(self): |
|
n = np.random.randint(0, self.grid_size-1, size=1) |
|
m = np.random.randint(1, self.grid_size-2, size=1) |
|
self.state = np.asarray([0, n, m])[np.newaxis] |
|
|
|
|
|
class ExperienceReplay(object): |
|
def __init__(self, max_memory=100, discount=.9): |
|
self.max_memory = max_memory |
|
self.memory = list() |
|
self.discount = discount |
|
|
|
def remember(self, states, game_over): |
|
# memory[i] = [[state_t, action_t, reward_t, state_t+1], game_over?] |
|
self.memory.append([states, game_over]) |
|
if len(self.memory) > self.max_memory: |
|
del self.memory[0] |
|
|
|
def get_batch(self, model, batch_size=10): |
|
len_memory = len(self.memory) |
|
num_actions = model.output_shape[-1] |
|
env_dim = self.memory[0][0][0].shape[1] |
|
inputs = np.zeros((min(len_memory, batch_size), env_dim)) |
|
targets = np.zeros((inputs.shape[0], num_actions)) |
|
for i, idx in enumerate(np.random.randint(0, len_memory, |
|
size=inputs.shape[0])): |
|
state_t, action_t, reward_t, state_tp1 = self.memory[idx][0] |
|
game_over = self.memory[idx][1] |
|
|
|
inputs[i:i+1] = state_t |
|
# There should be no target values for actions not taken. |
|
# Thou shalt not correct actions not taken #deep |
|
targets[i] = model.predict(state_t)[0] |
|
Q_sa = np.max(model.predict(state_tp1)[0]) |
|
if game_over: # if game_over is True |
|
targets[i, action_t] = reward_t |
|
else: |
|
# reward_t + gamma * max_a' Q(s', a') |
|
targets[i, action_t] = reward_t + self.discount * Q_sa |
|
return inputs, targets |
|
|
|
|
|
if __name__ == "__main__": |
|
# parameters |
|
epsilon = .1 # exploration |
|
num_actions = 3 # [move_left, stay, move_right] |
|
epoch = 1000 |
|
max_memory = 500 |
|
hidden_size = 100 |
|
batch_size = 50 |
|
grid_size = 10 |
|
|
|
model = Sequential() |
|
model.add(Dense(hidden_size, input_shape=(grid_size**2,), activation='relu')) |
|
model.add(Dense(hidden_size, activation='relu')) |
|
model.add(Dense(num_actions)) |
|
model.compile(sgd(lr=.2), "mse") |
|
|
|
# If you want to continue training from a previous model, just uncomment the line bellow |
|
# model.load_weights("model.h5") |
|
|
|
# Define environment/game |
|
env = Catch(grid_size) |
|
|
|
# Initialize experience replay object |
|
exp_replay = ExperienceReplay(max_memory=max_memory) |
|
|
|
# Train |
|
win_cnt = 0 |
|
for e in range(epoch): |
|
loss = 0. |
|
env.reset() |
|
game_over = False |
|
# get initial input |
|
input_t = env.observe() |
|
|
|
while not game_over: |
|
input_tm1 = input_t |
|
# get next action |
|
if np.random.rand() <= epsilon: |
|
action = np.random.randint(0, num_actions, size=1) |
|
else: |
|
q = model.predict(input_tm1) |
|
action = np.argmax(q[0]) |
|
|
|
# apply action, get rewards and new state |
|
input_t, reward, game_over = env.act(action) |
|
if reward == 1: |
|
win_cnt += 1 |
|
|
|
# store experience |
|
exp_replay.remember([input_tm1, action, reward, input_t], game_over) |
|
|
|
# adapt model |
|
inputs, targets = exp_replay.get_batch(model, batch_size=batch_size) |
|
|
|
loss += model.train_on_batch(inputs, targets)[0] |
|
print("Epoch {:03d}/999 | Loss {:.4f} | Win count {}".format(e, loss, win_cnt)) |
|
|
|
# Save trained model weights and architecture, this will be used by the visualization code |
|
model.save_weights("model.h5", overwrite=True) |
|
with open("model.json", "w") as outfile: |
|
json.dump(model.to_json(), outfile) |
Hi Eder,
Thanks for the really useful keras example. I have a question on your experience replay implementation.
The loss is calculated between the output of experience replay samples (lets call it OER) and calculated targets. Now, the action chosen at OER is the exact same as the ones that were stored in the experience. This is implemented indirectly in line 103
targets[i, action_t] = reward_t + self.discount * Q_sa
by inserting the target in appropriate column. When you calculate MSE loss between the targets tensor and network output (each of shapeinputs.shape[0]
*num_actions
), wouldn't all the values in network output other than the value corresponding to the non-zero target add to the loss? Since each target row has only one non-zero value.Really hope I'm not misreading your code or being confusing.