Created
May 18, 2017 07:17
-
-
Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
Keras Policy Gradient Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple policy gradient in Keras | |
""" | |
import gym | |
import numpy as np | |
from keras import layers | |
from keras.models import Model | |
from keras import backend as K | |
from keras import utils as np_utils | |
from keras import optimizers | |
class Agent(object): | |
def __init__(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
"""Gym Playing Agent | |
Args: | |
input_dim (int): the dimension of state. | |
Same as `env.observation_space.shape[0]` | |
output_dim (int): the number of discrete actions | |
Same as `env.action_space.n` | |
hidden_dims (list): hidden dimensions | |
Methods: | |
private: | |
__build_train_fn -> None | |
It creates a train function | |
It's similar to defining `train_op` in Tensorflow | |
__build_network -> None | |
It create a base model | |
Its output is each action probability | |
public: | |
get_action(state) -> action | |
fit(state, action, reward) -> None | |
""" | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
self.__build_network(input_dim, output_dim, hidden_dims) | |
self.__build_train_fn() | |
def __build_network(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
"""Create a base network""" | |
self.X = layers.Input(shape=(input_dim,)) | |
net = self.X | |
for h_dim in hidden_dims: | |
net = layers.Dense(h_dim)(net) | |
net = layers.Activation("relu")(net) | |
net = layers.Dense(output_dim)(net) | |
net = layers.Activation("softmax")(net) | |
self.model = Model(inputs=self.X, outputs=net) | |
def __build_train_fn(self): | |
"""Create a train function | |
It replaces `model.fit(X, y)` because we use the output of model and use it for training. | |
For example, we need action placeholder | |
called `action_one_hot` that stores, which action we took at state `s`. | |
Hence, we can update the same action. | |
This function will create | |
`self.train_fn([state, action_one_hot, discount_reward])` | |
which would train the model. | |
""" | |
action_prob_placeholder = self.model.output | |
action_onehot_placeholder = K.placeholder(shape=(None, self.output_dim), | |
name="action_onehot") | |
discount_reward_placeholder = K.placeholder(shape=(None,), | |
name="discount_reward") | |
action_prob = K.sum(action_prob_placeholder * action_onehot_placeholder, axis=1) | |
log_action_prob = K.log(action_prob) | |
loss = - log_action_prob * discount_reward_placeholder | |
loss = K.mean(loss) | |
adam = optimizers.Adam() | |
updates = adam.get_updates(params=self.model.trainable_weights, | |
constraints=[], | |
loss=loss) | |
self.train_fn = K.function(inputs=[self.model.input, | |
action_onehot_placeholder, | |
discount_reward_placeholder], | |
outputs=[], | |
updates=updates) | |
def get_action(self, state): | |
"""Returns an action at given `state` | |
Args: | |
state (1-D or 2-D Array): It can be either 1-D array of shape (state_dimension, ) | |
or 2-D array shape of (n_samples, state_dimension) | |
Returns: | |
action: an integer action value ranging from 0 to (n_actions - 1) | |
""" | |
shape = state.shape | |
if len(shape) == 1: | |
assert shape == (self.input_dim,), "{} != {}".format(shape, self.input_dim) | |
state = np.expand_dims(state, axis=0) | |
elif len(shape) == 2: | |
assert shape[1] == (self.input_dim), "{} != {}".format(shape, self.input_dim) | |
else: | |
raise TypeError("Wrong state shape is given: {}".format(state.shape)) | |
action_prob = np.squeeze(self.model.predict(state)) | |
assert len(action_prob) == self.output_dim, "{} != {}".format(len(action_prob), self.output_dim) | |
return np.random.choice(np.arange(self.output_dim), p=action_prob) | |
def fit(self, S, A, R): | |
"""Train a network | |
Args: | |
S (2-D Array): `state` array of shape (n_samples, state_dimension) | |
A (1-D Array): `action` array of shape (n_samples,) | |
It's simply a list of int that stores which actions the agent chose | |
R (1-D Array): `reward` array of shape (n_samples,) | |
A reward is given after each action. | |
""" | |
action_onehot = np_utils.to_categorical(A, num_classes=self.output_dim) | |
discount_reward = compute_discounted_R(R) | |
assert S.shape[1] == self.input_dim, "{} != {}".format(S.shape[1], self.input_dim) | |
assert action_onehot.shape[0] == S.shape[0], "{} != {}".format(action_onehot.shape[0], S.shape[0]) | |
assert action_onehot.shape[1] == self.output_dim, "{} != {}".format(action_onehot.shape[1], self.output_dim) | |
assert len(discount_reward.shape) == 1, "{} != 1".format(len(discount_reward.shape)) | |
self.train_fn([S, action_onehot, discount_reward]) | |
def compute_discounted_R(R, discount_rate=.99): | |
"""Returns discounted rewards | |
Args: | |
R (1-D array): a list of `reward` at each time step | |
discount_rate (float): Will discount the future value by this rate | |
Returns: | |
discounted_r (1-D array): same shape as input `R` | |
but the values are discounted | |
Examples: | |
>>> R = [1, 1, 1] | |
>>> compute_discounted_R(R, .99) # before normalization | |
[1 + 0.99 + 0.99**2, 1 + 0.99, 1] | |
""" | |
discounted_r = np.zeros_like(R, dtype=np.float32) | |
running_add = 0 | |
for t in reversed(range(len(R))): | |
running_add = running_add * discount_rate + R[t] | |
discounted_r[t] = running_add | |
discounted_r -= discounted_r.mean() / discounted_r.std() | |
return discounted_r | |
def run_episode(env, agent): | |
"""Returns an episode reward | |
(1) Play until the game is done | |
(2) The agent will choose an action according to the policy | |
(3) When it's done, it will train from the game play | |
Args: | |
env (gym.env): Gym environment | |
agent (Agent): Game Playing Agent | |
Returns: | |
total_reward (int): total reward earned during the whole episode | |
""" | |
done = False | |
S = [] | |
A = [] | |
R = [] | |
s = env.reset() | |
total_reward = 0 | |
while not done: | |
a = agent.get_action(s) | |
s2, r, done, info = env.step(a) | |
total_reward += r | |
S.append(s) | |
A.append(a) | |
R.append(r) | |
s = s2 | |
if done: | |
S = np.array(S) | |
A = np.array(A) | |
R = np.array(R) | |
agent.fit(S, A, R) | |
return total_reward | |
def main(): | |
try: | |
env = gym.make("CartPole-v0") | |
input_dim = env.observation_space.shape[0] | |
output_dim = env.action_space.n | |
agent = Agent(input_dim, output_dim, [16, 16]) | |
for episode in range(2000): | |
reward = run_episode(env, agent) | |
print(episode, reward) | |
finally: | |
env.close() | |
if __name__ == '__main__': | |
main() |
arjangroen
commented
Nov 7, 2019
•
- The noise can be inherent to the environment, if stochastic
- actor critic can eliminate noise because the gradient is calculated using a value network rather than (high-variance) returns of the environment
Copied your code, when tried to run received following error in get_updates():
TypeError: get_updates() got an unexpected keyword argument 'constraints'
why this might be ?
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'
. I simply commented out theconstraints=[],
at line 93. - Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.
To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution()
. - Then it throws
IndexError: list index out of range
because ofoutputs=[],
(line 99). Just change it tooutputs=[self.model.output],
and it runs fine for me.
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'
. I simply commented out theconstraints=[],
at line 93.- Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.
To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution()
.- Then it throws
IndexError: list index out of range
because ofoutputs=[],
(line 99). Just change it tooutputs=[self.model.output],
and it runs fine for me.
thank you :D
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment