Last active
December 17, 2018 23:53
-
-
Save 2minchul/1ae88677830b98178201ac9f0f61e9d3 to your computer and use it in GitHub Desktop.
qmaze tutorial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class Experience(object): | |
def __init__(self, model, max_memory=100, discount=0.95): | |
self.model = model | |
self.max_memory = max_memory | |
self.discount = discount | |
self.memory = list() | |
self.num_actions = model.output_shape[-1] | |
def remember(self, episode): | |
# episode = [envstate, action, reward, envstate_next, game_over] | |
# memory[i] = episode | |
# envstate == flattened 1d maze cells info, including rat cell (see method: observe) | |
self.memory.append(episode) | |
if len(self.memory) > self.max_memory: | |
del self.memory[0] | |
def predict(self, envstate): | |
return self.model.predict(envstate)[0] | |
def get_data(self, data_size=10): | |
env_size = self.memory[0][0].shape[1] # envstate 1d size (1st element of episode) | |
mem_size = len(self.memory) | |
data_size = min(mem_size, data_size) | |
inputs = np.zeros((data_size, env_size)) | |
targets = np.zeros((data_size, self.num_actions)) | |
for i, j in enumerate(np.random.choice(range(mem_size), data_size, replace=False)): | |
envstate, action, reward, envstate_next, game_over = self.memory[j] | |
inputs[i] = envstate | |
# There should be no target values for actions not taken. | |
targets[i] = self.predict(envstate) | |
# Q_sa = derived policy = max quality env/action = max_a' Q(s', a') | |
Q_sa = np.max(self.predict(envstate_next)) | |
if game_over: | |
targets[i, action] = reward | |
else: | |
# reward + gamma * max_a' Q(s', a') | |
targets[i, action] = reward + self.discount * Q_sa | |
return inputs, targets |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os, sys, time, datetime, json, random | |
import numpy as np | |
from keras.models import Sequential | |
from keras.layers.core import Dense, Activation | |
from keras.optimizers import SGD, Adam, RMSprop | |
from keras.models import model_from_json | |
from keras.layers.advanced_activations import PReLU | |
import matplotlib.pyplot as plt | |
from qmaze import Qmaze | |
from experience import Experience | |
# plt.interactive(False) # for pycharm | |
# Exploration factor | |
epsilon = 0.2 | |
def show(qmaze): | |
plt.grid('on') | |
nrows, ncols = qmaze.maze.shape | |
ax = plt.gca() | |
ax.set_xticks(np.arange(0.5, nrows, 1)) | |
ax.set_yticks(np.arange(0.5, ncols, 1)) | |
ax.set_xticklabels([]) | |
ax.set_yticklabels([]) | |
canvas = np.copy(qmaze.maze) | |
for row, col in qmaze.visited: | |
canvas[row, col] = 0.6 | |
rat_row, rat_col, _ = qmaze.state | |
canvas[rat_row, rat_col] = 0.3 # rat cell | |
# canvas[nrows - 1, ncols - 1] = 0.9 # cheese cell | |
img = plt.imshow(canvas, interpolation='none', cmap='gray') | |
plt.show() # for pycharm | |
return img | |
def play_game(model, qmaze, rat_cell, trace=False): | |
qmaze.reset(rat_cell) | |
envstate = qmaze.observe() | |
while True: | |
prev_envstate = envstate | |
# get next action | |
q = model.predict(prev_envstate) | |
action = np.argmax(q[0]) | |
# apply action, get rewards and new state | |
envstate, reward, game_status = qmaze.act(action) | |
if trace: | |
show(qmaze) | |
if game_status == 'win': | |
return True | |
elif game_status == 'lose': | |
return False | |
def completion_check(model, qmaze): | |
for cell in qmaze.free_cells: | |
if not qmaze.valid_actions(cell): | |
return False | |
if not play_game(model, qmaze, cell): | |
return False | |
return True | |
def qtrain(model, qmaze, **opt): | |
global epsilon | |
n_epoch = opt.get('n_epoch', 15000) | |
max_memory = opt.get('max_memory', 1000) | |
data_size = opt.get('data_size', 50) | |
weights_file = opt.get('weights_file', "") | |
name = opt.get('name', 'model') | |
start_time = datetime.datetime.now() | |
# If you want to continue training from a previous model, | |
# just supply the h5 file name to weights_file option | |
if weights_file: | |
print("loading weights from file: %s" % (weights_file,)) | |
model.load_weights(weights_file) | |
# Construct environment/game from numpy array: maze (see above) | |
# Initialize experience replay object | |
experience = Experience(model, max_memory=max_memory) | |
win_history = [] # history of win/lose game | |
hsize = qmaze.maze.size // 2 # history window size | |
win_rate = 0.0 | |
epoch = None | |
for epoch in range(n_epoch): | |
loss = 0.0 | |
rat_cell = random.choice(qmaze.free_cells) | |
qmaze.reset(rat_cell) | |
game_over = False | |
# get initial envstate (1d flattened canvas) | |
envstate = qmaze.observe() | |
n_episodes = 0 | |
while not game_over: | |
valid_actions = qmaze.valid_actions() | |
if not valid_actions: break | |
prev_envstate = envstate | |
# Get next action | |
if np.random.rand() < epsilon: | |
action = random.choice(valid_actions) | |
else: | |
action = np.argmax(experience.predict(prev_envstate)) | |
# Apply action, get reward and new envstate | |
envstate, reward, game_status = qmaze.act(action) | |
if game_status == 'win': | |
win_history.append(1) | |
game_over = True | |
elif game_status == 'lose': | |
win_history.append(0) | |
game_over = True | |
else: | |
game_over = False | |
# Store episode (experience) | |
episode = [prev_envstate, action, reward, envstate, game_over] | |
experience.remember(episode) | |
n_episodes += 1 | |
# Train neural network model | |
inputs, targets = experience.get_data(data_size=data_size) | |
h = model.fit( | |
inputs, | |
targets, | |
epochs=8, | |
batch_size=16, | |
verbose=0, | |
) | |
loss = model.evaluate(inputs, targets, verbose=0) | |
if len(win_history) > hsize: | |
win_rate = sum(win_history[-hsize:]) / hsize | |
dt = datetime.datetime.now() - start_time | |
t = format_time(dt.total_seconds()) | |
template = "Epoch: {:03d}/{:d} | Loss: {:.4f} | Episodes: {:d} | Win count: {:d} | Win rate: {:.3f} | time: {}" | |
print(template.format(epoch, n_epoch - 1, loss, n_episodes, sum(win_history), win_rate, t)) | |
# we simply check if training has exhausted all free cells and if in all | |
# cases the agent won | |
if win_rate > 0.9: | |
epsilon = 0.05 | |
if sum(win_history[-hsize:]) == hsize and completion_check(model, qmaze): | |
print("Reached 100%% win rate at epoch: %d" % (epoch,)) | |
break | |
# Save trained model weights and architecture, this will be used by the visualization code | |
h5file = name + ".h5" | |
json_file = name + ".json" | |
model.save_weights(h5file, overwrite=True) | |
with open(json_file, "w") as outfile: | |
json.dump(model.to_json(), outfile) | |
end_time = datetime.datetime.now() | |
dt = datetime.datetime.now() - start_time | |
seconds = dt.total_seconds() | |
t = format_time(seconds) | |
print('files: %s, %s' % (h5file, json_file)) | |
print("n_epoch: %d, max_mem: %d, data: %d, time: %s" % (epoch, max_memory, data_size, t)) | |
return seconds | |
# This is a small utility for printing readable time strings: | |
def format_time(seconds): | |
if seconds < 400: | |
s = float(seconds) | |
return "%.1f seconds" % (s,) | |
elif seconds < 4000: | |
m = seconds / 60.0 | |
return "%.2f minutes" % (m,) | |
else: | |
h = seconds / 3600.0 | |
return "%.2f hours" % (h,) | |
def build_model(maze, num_actions, lr=0.001): | |
model = Sequential() | |
model.add(Dense(maze.size, input_shape=(maze.size,))) | |
model.add(PReLU()) | |
model.add(Dense(maze.size)) | |
model.add(PReLU()) | |
model.add(Dense(num_actions)) | |
model.compile(optimizer='adam', loss='mse') | |
return model | |
if __name__ == '__main__': | |
# maze = np.array([ | |
# [1., 0., 1., 1., 1., 1., 1., 1., 1., 1.], | |
# [1., 1., 1., 1., 1., 0., 1., 1., 1., 1.], | |
# [1., 1., 1., 1., 1., 0., 1., 1., 1., 1.], | |
# [0., 0., 1., 0., 0., 1., 0., 1., 1., 1.], | |
# [1., 1., 0., 1., 0., 1., 0., 0., 0., 1.], | |
# [1., 1., 0., 1., 0., 1., 1., 1., 1., 1.], | |
# [1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], | |
# [1., 1., 1., 1., 1., 1., 0., 0., 0., 0.], | |
# [1., 0., 0., 0., 0., 0., 1., 1., 1., 1.], | |
# [1., 1., 1., 1., 1., 1., 1., 0., 1., 1.] | |
# ]) | |
maze = np.array([ | |
[1., 0., 1., 1., 1., 1., 1.], | |
[1., 1., 1., 0., 0., 1., 0.], | |
[0., 0., 0., 1., 1., 1., 0.], | |
[1., 1., 1., 1., 0., 0., 1.], | |
[1., 0., 0., 0., 1., 1., 1.], | |
[1., 0., 1., 1., 1., 1., 1.], | |
[1., 1., 1., 0., 1., 1., 1.] | |
]) | |
qmaze = Qmaze(maze, (0, 0)) | |
model = build_model(maze, qmaze.num_actions) | |
# with open("model.json") as f: | |
# loaded_model_json = json.load(f) | |
# | |
# model = model_from_json(loaded_model_json) | |
# model.load_weights("model.h5") | |
# model.compile(optimizer='adam', loss='mse') | |
qtrain(model, qmaze, epochs=1000, max_memory=8 * maze.size, data_size=32) | |
print(model.summary()) | |
show(qmaze) | |
play_game(model, qmaze, (0, 0)) | |
show(qmaze) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
visited_mark = 0.8 # Cells visited by the rat will be painted by gray 0.8 | |
rat_mark = 0.5 # The current rat cell will be painteg by gray 0.5 | |
LEFT = 0 | |
UP = 1 | |
RIGHT = 2 | |
DOWN = 3 | |
# Actions dictionary | |
actions_dict = { | |
LEFT: 'left', | |
UP: 'up', | |
RIGHT: 'right', | |
DOWN: 'down', | |
} | |
actions_movement = { | |
LEFT: (0, -1), | |
UP: (-1, 0), | |
RIGHT: (0, 1), | |
DOWN: (1, 0), | |
} | |
class Qmaze(object): | |
def __init__(self, maze, rat=(0, 0)): | |
self._maze = np.array(maze) | |
self.num_actions = len(actions_dict) | |
self.row_size, self.col_size = self._maze.shape | |
self.target = (self.row_size - 1, self.col_size - 1) # target cell where the "cheese" is | |
self.free_cells = [(r, c) for r in range(self.row_size) for c in range(self.col_size) if | |
self._maze[r, c] == 1.0] | |
self.free_cells.remove(self.target) | |
if self._maze[self.target] == 0.0: | |
raise Exception("Invalid maze: target cell cannot be blocked!") | |
if not rat in self.free_cells: | |
raise Exception("Invalid Rat Location: must sit on a free cell") | |
self.reset(rat) | |
def reset(self, rat): | |
self.rat = rat | |
self.maze = np.copy(self._maze) | |
row, col = rat | |
self.maze[row, col] = rat_mark | |
self.state = (row, col, 'start') | |
self.min_reward = -0.5 * self.maze.size | |
self.total_reward = 0 | |
self.visited = set() | |
def update_state(self, action): | |
nrow, ncol, nmode = rat_row, rat_col, mode = self.state | |
if self.maze[rat_row, rat_col] > 0.0: | |
self.visited.add((rat_row, rat_col)) # mark visited cell | |
valid_actions = self.valid_actions() | |
if not valid_actions: | |
nmode = 'blocked' | |
elif action in valid_actions: | |
nmode = 'valid' | |
if action == LEFT: | |
ncol -= 1 | |
elif action == UP: | |
nrow -= 1 | |
if action == RIGHT: | |
ncol += 1 | |
elif action == DOWN: | |
nrow += 1 | |
else: # invalid action, no change in rat position | |
mode = 'invalid' | |
# new state | |
self.state = (nrow, ncol, nmode) | |
def get_reward(self): | |
rat_row, rat_col, mode = self.state | |
nrows, ncols = self.maze.shape | |
if rat_row == nrows - 1 and rat_col == ncols - 1: | |
return 1.0 | |
if mode == 'blocked': | |
return self.min_reward - 1 | |
if (rat_row, rat_col) in self.visited: | |
return -0.25 | |
if mode == 'invalid': | |
return -0.75 | |
if mode == 'valid': | |
return -0.04 | |
def act(self, action): | |
self.update_state(action) | |
reward = self.get_reward() | |
self.total_reward += reward | |
status = self.game_status() | |
envstate = self.observe() | |
return envstate, reward, status | |
def observe(self): | |
canvas = self.draw_env() | |
envstate = canvas.reshape((1, -1)) | |
return envstate | |
def draw_env(self): | |
canvas = np.copy(self.maze) | |
nrows, ncols = self.maze.shape | |
# clear all visual marks | |
for r in range(nrows): | |
for c in range(ncols): | |
if canvas[r, c] > 0.0: | |
canvas[r, c] = 1.0 | |
# draw the rat | |
row, col, valid = self.state | |
canvas[row, col] = rat_mark | |
return canvas | |
def game_status(self): | |
if self.total_reward < self.min_reward: | |
return 'lose' | |
rat_row, rat_col, mode = self.state | |
if rat_row == self.row_size - 1 and rat_col == self.col_size - 1: | |
return 'win' | |
return 'not_over' | |
def valid_actions(self, cell=None): | |
if cell is None: | |
row, col, mode = self.state | |
else: | |
row, col = cell | |
actions = [] | |
for action, (row_increase, col_increase) in actions_movement.items(): | |
if self.is_valid_position(row + row_increase, col + col_increase): | |
actions.append(action) | |
return actions | |
def is_valid_position(self, row, col): | |
if 0 <= row < self.row_size and 0 <= col < self.col_size: | |
if not self.maze[row, col] == 0.0: | |
return True | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment