Skip to content

Instantly share code, notes, and snippets.

@2minchul
Last active December 17, 2018 23:53
Show Gist options
  • Save 2minchul/1ae88677830b98178201ac9f0f61e9d3 to your computer and use it in GitHub Desktop.
Save 2minchul/1ae88677830b98178201ac9f0f61e9d3 to your computer and use it in GitHub Desktop.
qmaze tutorial
import numpy as np
class Experience(object):
def __init__(self, model, max_memory=100, discount=0.95):
self.model = model
self.max_memory = max_memory
self.discount = discount
self.memory = list()
self.num_actions = model.output_shape[-1]
def remember(self, episode):
# episode = [envstate, action, reward, envstate_next, game_over]
# memory[i] = episode
# envstate == flattened 1d maze cells info, including rat cell (see method: observe)
self.memory.append(episode)
if len(self.memory) > self.max_memory:
del self.memory[0]
def predict(self, envstate):
return self.model.predict(envstate)[0]
def get_data(self, data_size=10):
env_size = self.memory[0][0].shape[1] # envstate 1d size (1st element of episode)
mem_size = len(self.memory)
data_size = min(mem_size, data_size)
inputs = np.zeros((data_size, env_size))
targets = np.zeros((data_size, self.num_actions))
for i, j in enumerate(np.random.choice(range(mem_size), data_size, replace=False)):
envstate, action, reward, envstate_next, game_over = self.memory[j]
inputs[i] = envstate
# There should be no target values for actions not taken.
targets[i] = self.predict(envstate)
# Q_sa = derived policy = max quality env/action = max_a' Q(s', a')
Q_sa = np.max(self.predict(envstate_next))
if game_over:
targets[i, action] = reward
else:
# reward + gamma * max_a' Q(s', a')
targets[i, action] = reward + self.discount * Q_sa
return inputs, targets
from __future__ import print_function
import os, sys, time, datetime, json, random
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.optimizers import SGD, Adam, RMSprop
from keras.models import model_from_json
from keras.layers.advanced_activations import PReLU
import matplotlib.pyplot as plt
from qmaze import Qmaze
from experience import Experience
# plt.interactive(False) # for pycharm
# Exploration factor
epsilon = 0.2
def show(qmaze):
plt.grid('on')
nrows, ncols = qmaze.maze.shape
ax = plt.gca()
ax.set_xticks(np.arange(0.5, nrows, 1))
ax.set_yticks(np.arange(0.5, ncols, 1))
ax.set_xticklabels([])
ax.set_yticklabels([])
canvas = np.copy(qmaze.maze)
for row, col in qmaze.visited:
canvas[row, col] = 0.6
rat_row, rat_col, _ = qmaze.state
canvas[rat_row, rat_col] = 0.3 # rat cell
# canvas[nrows - 1, ncols - 1] = 0.9 # cheese cell
img = plt.imshow(canvas, interpolation='none', cmap='gray')
plt.show() # for pycharm
return img
def play_game(model, qmaze, rat_cell, trace=False):
qmaze.reset(rat_cell)
envstate = qmaze.observe()
while True:
prev_envstate = envstate
# get next action
q = model.predict(prev_envstate)
action = np.argmax(q[0])
# apply action, get rewards and new state
envstate, reward, game_status = qmaze.act(action)
if trace:
show(qmaze)
if game_status == 'win':
return True
elif game_status == 'lose':
return False
def completion_check(model, qmaze):
for cell in qmaze.free_cells:
if not qmaze.valid_actions(cell):
return False
if not play_game(model, qmaze, cell):
return False
return True
def qtrain(model, qmaze, **opt):
global epsilon
n_epoch = opt.get('n_epoch', 15000)
max_memory = opt.get('max_memory', 1000)
data_size = opt.get('data_size', 50)
weights_file = opt.get('weights_file', "")
name = opt.get('name', 'model')
start_time = datetime.datetime.now()
# If you want to continue training from a previous model,
# just supply the h5 file name to weights_file option
if weights_file:
print("loading weights from file: %s" % (weights_file,))
model.load_weights(weights_file)
# Construct environment/game from numpy array: maze (see above)
# Initialize experience replay object
experience = Experience(model, max_memory=max_memory)
win_history = [] # history of win/lose game
hsize = qmaze.maze.size // 2 # history window size
win_rate = 0.0
epoch = None
for epoch in range(n_epoch):
loss = 0.0
rat_cell = random.choice(qmaze.free_cells)
qmaze.reset(rat_cell)
game_over = False
# get initial envstate (1d flattened canvas)
envstate = qmaze.observe()
n_episodes = 0
while not game_over:
valid_actions = qmaze.valid_actions()
if not valid_actions: break
prev_envstate = envstate
# Get next action
if np.random.rand() < epsilon:
action = random.choice(valid_actions)
else:
action = np.argmax(experience.predict(prev_envstate))
# Apply action, get reward and new envstate
envstate, reward, game_status = qmaze.act(action)
if game_status == 'win':
win_history.append(1)
game_over = True
elif game_status == 'lose':
win_history.append(0)
game_over = True
else:
game_over = False
# Store episode (experience)
episode = [prev_envstate, action, reward, envstate, game_over]
experience.remember(episode)
n_episodes += 1
# Train neural network model
inputs, targets = experience.get_data(data_size=data_size)
h = model.fit(
inputs,
targets,
epochs=8,
batch_size=16,
verbose=0,
)
loss = model.evaluate(inputs, targets, verbose=0)
if len(win_history) > hsize:
win_rate = sum(win_history[-hsize:]) / hsize
dt = datetime.datetime.now() - start_time
t = format_time(dt.total_seconds())
template = "Epoch: {:03d}/{:d} | Loss: {:.4f} | Episodes: {:d} | Win count: {:d} | Win rate: {:.3f} | time: {}"
print(template.format(epoch, n_epoch - 1, loss, n_episodes, sum(win_history), win_rate, t))
# we simply check if training has exhausted all free cells and if in all
# cases the agent won
if win_rate > 0.9:
epsilon = 0.05
if sum(win_history[-hsize:]) == hsize and completion_check(model, qmaze):
print("Reached 100%% win rate at epoch: %d" % (epoch,))
break
# Save trained model weights and architecture, this will be used by the visualization code
h5file = name + ".h5"
json_file = name + ".json"
model.save_weights(h5file, overwrite=True)
with open(json_file, "w") as outfile:
json.dump(model.to_json(), outfile)
end_time = datetime.datetime.now()
dt = datetime.datetime.now() - start_time
seconds = dt.total_seconds()
t = format_time(seconds)
print('files: %s, %s' % (h5file, json_file))
print("n_epoch: %d, max_mem: %d, data: %d, time: %s" % (epoch, max_memory, data_size, t))
return seconds
# This is a small utility for printing readable time strings:
def format_time(seconds):
if seconds < 400:
s = float(seconds)
return "%.1f seconds" % (s,)
elif seconds < 4000:
m = seconds / 60.0
return "%.2f minutes" % (m,)
else:
h = seconds / 3600.0
return "%.2f hours" % (h,)
def build_model(maze, num_actions, lr=0.001):
model = Sequential()
model.add(Dense(maze.size, input_shape=(maze.size,)))
model.add(PReLU())
model.add(Dense(maze.size))
model.add(PReLU())
model.add(Dense(num_actions))
model.compile(optimizer='adam', loss='mse')
return model
if __name__ == '__main__':
# maze = np.array([
# [1., 0., 1., 1., 1., 1., 1., 1., 1., 1.],
# [1., 1., 1., 1., 1., 0., 1., 1., 1., 1.],
# [1., 1., 1., 1., 1., 0., 1., 1., 1., 1.],
# [0., 0., 1., 0., 0., 1., 0., 1., 1., 1.],
# [1., 1., 0., 1., 0., 1., 0., 0., 0., 1.],
# [1., 1., 0., 1., 0., 1., 1., 1., 1., 1.],
# [1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
# [1., 1., 1., 1., 1., 1., 0., 0., 0., 0.],
# [1., 0., 0., 0., 0., 0., 1., 1., 1., 1.],
# [1., 1., 1., 1., 1., 1., 1., 0., 1., 1.]
# ])
maze = np.array([
[1., 0., 1., 1., 1., 1., 1.],
[1., 1., 1., 0., 0., 1., 0.],
[0., 0., 0., 1., 1., 1., 0.],
[1., 1., 1., 1., 0., 0., 1.],
[1., 0., 0., 0., 1., 1., 1.],
[1., 0., 1., 1., 1., 1., 1.],
[1., 1., 1., 0., 1., 1., 1.]
])
qmaze = Qmaze(maze, (0, 0))
model = build_model(maze, qmaze.num_actions)
# with open("model.json") as f:
# loaded_model_json = json.load(f)
#
# model = model_from_json(loaded_model_json)
# model.load_weights("model.h5")
# model.compile(optimizer='adam', loss='mse')
qtrain(model, qmaze, epochs=1000, max_memory=8 * maze.size, data_size=32)
print(model.summary())
show(qmaze)
play_game(model, qmaze, (0, 0))
show(qmaze)
import numpy as np
visited_mark = 0.8 # Cells visited by the rat will be painted by gray 0.8
rat_mark = 0.5 # The current rat cell will be painteg by gray 0.5
LEFT = 0
UP = 1
RIGHT = 2
DOWN = 3
# Actions dictionary
actions_dict = {
LEFT: 'left',
UP: 'up',
RIGHT: 'right',
DOWN: 'down',
}
actions_movement = {
LEFT: (0, -1),
UP: (-1, 0),
RIGHT: (0, 1),
DOWN: (1, 0),
}
class Qmaze(object):
def __init__(self, maze, rat=(0, 0)):
self._maze = np.array(maze)
self.num_actions = len(actions_dict)
self.row_size, self.col_size = self._maze.shape
self.target = (self.row_size - 1, self.col_size - 1) # target cell where the "cheese" is
self.free_cells = [(r, c) for r in range(self.row_size) for c in range(self.col_size) if
self._maze[r, c] == 1.0]
self.free_cells.remove(self.target)
if self._maze[self.target] == 0.0:
raise Exception("Invalid maze: target cell cannot be blocked!")
if not rat in self.free_cells:
raise Exception("Invalid Rat Location: must sit on a free cell")
self.reset(rat)
def reset(self, rat):
self.rat = rat
self.maze = np.copy(self._maze)
row, col = rat
self.maze[row, col] = rat_mark
self.state = (row, col, 'start')
self.min_reward = -0.5 * self.maze.size
self.total_reward = 0
self.visited = set()
def update_state(self, action):
nrow, ncol, nmode = rat_row, rat_col, mode = self.state
if self.maze[rat_row, rat_col] > 0.0:
self.visited.add((rat_row, rat_col)) # mark visited cell
valid_actions = self.valid_actions()
if not valid_actions:
nmode = 'blocked'
elif action in valid_actions:
nmode = 'valid'
if action == LEFT:
ncol -= 1
elif action == UP:
nrow -= 1
if action == RIGHT:
ncol += 1
elif action == DOWN:
nrow += 1
else: # invalid action, no change in rat position
mode = 'invalid'
# new state
self.state = (nrow, ncol, nmode)
def get_reward(self):
rat_row, rat_col, mode = self.state
nrows, ncols = self.maze.shape
if rat_row == nrows - 1 and rat_col == ncols - 1:
return 1.0
if mode == 'blocked':
return self.min_reward - 1
if (rat_row, rat_col) in self.visited:
return -0.25
if mode == 'invalid':
return -0.75
if mode == 'valid':
return -0.04
def act(self, action):
self.update_state(action)
reward = self.get_reward()
self.total_reward += reward
status = self.game_status()
envstate = self.observe()
return envstate, reward, status
def observe(self):
canvas = self.draw_env()
envstate = canvas.reshape((1, -1))
return envstate
def draw_env(self):
canvas = np.copy(self.maze)
nrows, ncols = self.maze.shape
# clear all visual marks
for r in range(nrows):
for c in range(ncols):
if canvas[r, c] > 0.0:
canvas[r, c] = 1.0
# draw the rat
row, col, valid = self.state
canvas[row, col] = rat_mark
return canvas
def game_status(self):
if self.total_reward < self.min_reward:
return 'lose'
rat_row, rat_col, mode = self.state
if rat_row == self.row_size - 1 and rat_col == self.col_size - 1:
return 'win'
return 'not_over'
def valid_actions(self, cell=None):
if cell is None:
row, col, mode = self.state
else:
row, col = cell
actions = []
for action, (row_increase, col_increase) in actions_movement.items():
if self.is_valid_position(row + row_increase, col + col_increase):
actions.append(action)
return actions
def is_valid_position(self, row, col):
if 0 <= row < self.row_size and 0 <= col < self.col_size:
if not self.maze[row, col] == 0.0:
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment