Created
July 9, 2017 08:42
-
-
Save tsu-nera/6250f01a8f1a5ae37bdda84ebb424e5f to your computer and use it in GitHub Desktop.
DQN cartpole with numpy only
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
from collections import deque | |
from gym import wrappers | |
# Create the Cart-Pole game environment | |
env = gym.make('CartPole-v0') | |
env = wrappers.Monitor(env, '/tmp/cartpole-experiment-1') | |
def relu(x): | |
return np.maximum(0, x) | |
def relu_grad(dout): | |
mask = (dout <= 0) | |
dout[mask] = 0 | |
dx = dout | |
return dx | |
def softmax(x): | |
if x.ndim == 2: | |
x = x.T | |
x = x - np.max(x, axis=0) | |
y = np.exp(x) / np.sum(np.exp(x), axis=0) | |
return y.T | |
x = x - np.max(x) # オーバーフロー対策 | |
return np.exp(x) / np.sum(np.exp(x)) | |
def mean_squared_error(y, t): | |
return 0.5 * np.sum((y-t)**2) | |
class SGD: | |
"""確率的勾配降下法(Stochastic Gradient Descent)""" | |
def __init__(self, lr=0.01): | |
self.lr = lr | |
def update(self, params, grads): | |
for key in params.keys(): | |
params[key] -= self.lr * grads[key] | |
class Adam: | |
"""Adam (http://arxiv.org/abs/1412.6980v8)""" | |
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999): | |
self.lr = lr | |
self.beta1 = beta1 | |
self.beta2 = beta2 | |
self.iter = 0 | |
self.m = None | |
self.v = None | |
def update(self, params, grads): | |
if self.m is None: | |
self.m, self.v = {}, {} | |
for key, val in params.items(): | |
self.m[key] = np.zeros_like(val) | |
self.v[key] = np.zeros_like(val) | |
self.iter += 1 | |
lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter) | |
for key in params.keys(): | |
self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key]) | |
self.v[key] += (1 - self.beta2) * (grads[key] ** 2 - self.v[key]) | |
params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7) | |
class QNetwork: | |
def __init__(self, input_size, hidden_size, output_size, weight_init_std=0.01): | |
# 重みの初期化 | |
self.params = {} | |
self.params['W1'] = weight_init_std * np.random.randn(input_size, hidden_size) | |
#self.params['b1'] = np.zeros(hidden_size) | |
self.params['W2'] = weight_init_std * np.random.randn(hidden_size, output_size) | |
#self.params['b2'] = np.zeros(output_size) | |
def predict(self, x): | |
W1, W2 = self.params['W1'], self.params['W2'] | |
#b1, b2 = self.params['b1'], self.params['b2'] | |
a1 = np.dot(x, W1)# + b1 | |
z1 = relu(a1) | |
a2 = np.dot(z1, W2)# + b2 | |
y = softmax(a2) | |
return y | |
def fit(self, x, t): | |
W1, W2 = self.params['W1'], self.params['W2'] | |
#b1, b2 = self.params['b1'], self.params['b2'] | |
grads = {} | |
batch_num = x.shape[0] | |
# forward | |
a1 = np.dot(x, W1)# + b1 | |
z1 = relu(a1) | |
a2 = np.dot(z1, W2)# + b2 | |
y = softmax(a2) | |
# backward | |
dy = (y - t) / batch_num | |
grads['W2'] = np.dot(z1.T, dy) | |
#grads['b2'] = np.sum(dy, axis=0) | |
da1 = np.dot(dy, W2.T) | |
dz1 = relu_grad(a1) * da1 | |
grads['W1'] = np.dot(x.T, dz1) | |
#grads['b1'] = np.sum(dz1, axis=0) | |
# 更新 | |
opt = Adam(lr=learning_rate) | |
opt.update(self.params, grads) | |
def loss(self, x, t): | |
y = self.predict(x) | |
return mean_squared_error(y, t) | |
class Memory(): | |
def __init__(self, max_size=1000): | |
self.buffer = deque(maxlen=max_size) | |
def add(self, experience): | |
self.buffer.append(experience) | |
def sample(self, batch_size): | |
idx = np.random.choice(np.arange(len(self.buffer)), | |
size=batch_size, | |
replace=False) | |
return [self.buffer[ii] for ii in idx] | |
train_episodes = 1000 # max number of episodes to learn from | |
max_steps = 200 # max steps in an episode | |
gamma = 0.99 # future reward discount | |
# Exploration parameters | |
explore_start = 1.0 # exploration probability at start | |
explore_stop = 0.01 # minimum exploration probability | |
decay_rate = 0.0001 # exponential decay rate for exploration prob | |
# Network parameters | |
hidden_size = 16 # number of units in each Q-network hidden layer | |
learning_rate = 1e-4 # Q-network learning rate | |
# Memory parameters | |
memory_size = 10000 # memory capacity | |
batch_size = 32 # experience mini-batch size | |
pretrain_length = batch_size # number experiences to pretrain the memory | |
mainQN = QNetwork(input_size=4, hidden_size=hidden_size, output_size=2, weight_init_std=1.0) | |
################################### | |
## Populate the experience memory | |
################################### | |
# Initialize the simulation | |
env.reset() | |
# Take one random step to get the pole and cart moving | |
state, reward, done, _ = env.step(env.action_space.sample()) | |
state = np.reshape(state, [1, 4]) | |
memory = Memory(max_size=memory_size) | |
# Make a bunch of random actions and store the experiences | |
for ii in range(pretrain_length): | |
# Uncomment the line below to watch the simulation | |
# env.render() | |
# Make a random action | |
action = env.action_space.sample() | |
next_state, reward, done, _ = env.step(action) | |
next_state = np.reshape(next_state, [1, 4]) | |
if done: | |
# The simulation fails so no next state | |
next_state = np.zeros(state.shape) | |
# Add experience to memory | |
memory.add((state, action, reward, next_state)) | |
# Start new episode | |
env.reset() | |
# Take one random step to get the pole and cart moving | |
state, reward, done, _ = env.step(env.action_space.sample()) | |
state = np.reshape(state, [1, 4]) | |
else: | |
# Add experience to memory | |
memory.add((state, action, reward, next_state)) | |
state = next_state | |
############# | |
## Training | |
############# | |
step = 0 | |
for ep in range(1, train_episodes): | |
total_reward = 0 | |
t = 0 | |
while t < max_steps: | |
# Replay | |
inputs = np.zeros((batch_size, 4)) | |
targets = np.zeros((batch_size, 2)) | |
minibatch = memory.sample(batch_size) | |
for i, (state_b, action_b, reward_b, next_state_b) in enumerate(minibatch): | |
inputs[i:i+1] = state_b | |
target = reward_b | |
if not (next_state_b == np.zeros(state_b.shape)).all(axis=1): | |
target_Q = mainQN.predict(next_state_b)[0] | |
target = reward_b + gamma * np.amax(mainQN.predict(next_state_b)[0]) | |
targets[i] = mainQN.predict(state_b) | |
targets[i][action_b] = target | |
mainQN.fit(inputs, targets) | |
step += 1 | |
# Explore or Exploit | |
explore_p = explore_stop + (explore_start - explore_stop)*np.exp(-decay_rate*step) | |
if explore_p > np.random.rand(): | |
# Make a random action | |
action = env.action_space.sample() | |
else: | |
# Get action from Q-network | |
Qs = mainQN.predict(state)[0] | |
action = np.argmax(Qs) | |
# Take action, get new state and reward | |
next_state, reward, done, _ = env.step(action) | |
next_state = np.reshape(next_state, [1, 4]) | |
total_reward += reward | |
if done: | |
# the episode ends so no next state | |
next_state = np.zeros(state.shape) | |
t = max_steps | |
print('Episode: {}'.format(ep), | |
'Total reward: {}'.format(total_reward), | |
'Loss {:.6f}'.format(mainQN.loss(inputs, targets)), | |
'Explore P: {:.4f}'.format(explore_p)) | |
# Add experience to memory | |
memory.add((state, action, reward, next_state)) | |
# Start new episode | |
env.reset() | |
# Take one random step to get the pole and cart moving | |
state, reward, done, _ = env.step(env.action_space.sample()) | |
state = np.reshape(state, [1, 4]) | |
else: | |
# Add experience to memory | |
memory.add((state, action, reward, next_state)) | |
state = next_state | |
t += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gym.openai.com/evaluations/eval_iNrsSMkNSxW1wGF0b1lspg