-
-
Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
""" | |
Simple policy gradient in Keras | |
""" | |
import gym | |
import numpy as np | |
from keras import layers | |
from keras.models import Model | |
from keras import backend as K | |
from keras import utils as np_utils | |
from keras import optimizers | |
class Agent(object): | |
def __init__(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
"""Gym Playing Agent | |
Args: | |
input_dim (int): the dimension of state. | |
Same as `env.observation_space.shape[0]` | |
output_dim (int): the number of discrete actions | |
Same as `env.action_space.n` | |
hidden_dims (list): hidden dimensions | |
Methods: | |
private: | |
__build_train_fn -> None | |
It creates a train function | |
It's similar to defining `train_op` in Tensorflow | |
__build_network -> None | |
It create a base model | |
Its output is each action probability | |
public: | |
get_action(state) -> action | |
fit(state, action, reward) -> None | |
""" | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
self.__build_network(input_dim, output_dim, hidden_dims) | |
self.__build_train_fn() | |
def __build_network(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
"""Create a base network""" | |
self.X = layers.Input(shape=(input_dim,)) | |
net = self.X | |
for h_dim in hidden_dims: | |
net = layers.Dense(h_dim)(net) | |
net = layers.Activation("relu")(net) | |
net = layers.Dense(output_dim)(net) | |
net = layers.Activation("softmax")(net) | |
self.model = Model(inputs=self.X, outputs=net) | |
def __build_train_fn(self): | |
"""Create a train function | |
It replaces `model.fit(X, y)` because we use the output of model and use it for training. | |
For example, we need action placeholder | |
called `action_one_hot` that stores, which action we took at state `s`. | |
Hence, we can update the same action. | |
This function will create | |
`self.train_fn([state, action_one_hot, discount_reward])` | |
which would train the model. | |
""" | |
action_prob_placeholder = self.model.output | |
action_onehot_placeholder = K.placeholder(shape=(None, self.output_dim), | |
name="action_onehot") | |
discount_reward_placeholder = K.placeholder(shape=(None,), | |
name="discount_reward") | |
action_prob = K.sum(action_prob_placeholder * action_onehot_placeholder, axis=1) | |
log_action_prob = K.log(action_prob) | |
loss = - log_action_prob * discount_reward_placeholder | |
loss = K.mean(loss) | |
adam = optimizers.Adam() | |
updates = adam.get_updates(params=self.model.trainable_weights, | |
constraints=[], | |
loss=loss) | |
self.train_fn = K.function(inputs=[self.model.input, | |
action_onehot_placeholder, | |
discount_reward_placeholder], | |
outputs=[], | |
updates=updates) | |
def get_action(self, state): | |
"""Returns an action at given `state` | |
Args: | |
state (1-D or 2-D Array): It can be either 1-D array of shape (state_dimension, ) | |
or 2-D array shape of (n_samples, state_dimension) | |
Returns: | |
action: an integer action value ranging from 0 to (n_actions - 1) | |
""" | |
shape = state.shape | |
if len(shape) == 1: | |
assert shape == (self.input_dim,), "{} != {}".format(shape, self.input_dim) | |
state = np.expand_dims(state, axis=0) | |
elif len(shape) == 2: | |
assert shape[1] == (self.input_dim), "{} != {}".format(shape, self.input_dim) | |
else: | |
raise TypeError("Wrong state shape is given: {}".format(state.shape)) | |
action_prob = np.squeeze(self.model.predict(state)) | |
assert len(action_prob) == self.output_dim, "{} != {}".format(len(action_prob), self.output_dim) | |
return np.random.choice(np.arange(self.output_dim), p=action_prob) | |
def fit(self, S, A, R): | |
"""Train a network | |
Args: | |
S (2-D Array): `state` array of shape (n_samples, state_dimension) | |
A (1-D Array): `action` array of shape (n_samples,) | |
It's simply a list of int that stores which actions the agent chose | |
R (1-D Array): `reward` array of shape (n_samples,) | |
A reward is given after each action. | |
""" | |
action_onehot = np_utils.to_categorical(A, num_classes=self.output_dim) | |
discount_reward = compute_discounted_R(R) | |
assert S.shape[1] == self.input_dim, "{} != {}".format(S.shape[1], self.input_dim) | |
assert action_onehot.shape[0] == S.shape[0], "{} != {}".format(action_onehot.shape[0], S.shape[0]) | |
assert action_onehot.shape[1] == self.output_dim, "{} != {}".format(action_onehot.shape[1], self.output_dim) | |
assert len(discount_reward.shape) == 1, "{} != 1".format(len(discount_reward.shape)) | |
self.train_fn([S, action_onehot, discount_reward]) | |
def compute_discounted_R(R, discount_rate=.99): | |
"""Returns discounted rewards | |
Args: | |
R (1-D array): a list of `reward` at each time step | |
discount_rate (float): Will discount the future value by this rate | |
Returns: | |
discounted_r (1-D array): same shape as input `R` | |
but the values are discounted | |
Examples: | |
>>> R = [1, 1, 1] | |
>>> compute_discounted_R(R, .99) # before normalization | |
[1 + 0.99 + 0.99**2, 1 + 0.99, 1] | |
""" | |
discounted_r = np.zeros_like(R, dtype=np.float32) | |
running_add = 0 | |
for t in reversed(range(len(R))): | |
running_add = running_add * discount_rate + R[t] | |
discounted_r[t] = running_add | |
discounted_r -= discounted_r.mean() / discounted_r.std() | |
return discounted_r | |
def run_episode(env, agent): | |
"""Returns an episode reward | |
(1) Play until the game is done | |
(2) The agent will choose an action according to the policy | |
(3) When it's done, it will train from the game play | |
Args: | |
env (gym.env): Gym environment | |
agent (Agent): Game Playing Agent | |
Returns: | |
total_reward (int): total reward earned during the whole episode | |
""" | |
done = False | |
S = [] | |
A = [] | |
R = [] | |
s = env.reset() | |
total_reward = 0 | |
while not done: | |
a = agent.get_action(s) | |
s2, r, done, info = env.step(a) | |
total_reward += r | |
S.append(s) | |
A.append(a) | |
R.append(r) | |
s = s2 | |
if done: | |
S = np.array(S) | |
A = np.array(A) | |
R = np.array(R) | |
agent.fit(S, A, R) | |
return total_reward | |
def main(): | |
try: | |
env = gym.make("CartPole-v0") | |
input_dim = env.observation_space.shape[0] | |
output_dim = env.action_space.n | |
agent = Agent(input_dim, output_dim, [16, 16]) | |
for episode in range(2000): | |
reward = run_episode(env, agent) | |
print(episode, reward) | |
finally: | |
env.close() | |
if __name__ == '__main__': | |
main() |
When you are computing your loss function for given timestep, you don't sum over the previous timesteps for that episode/trajectory. Why?
I am new to policy gradient, so I maybe wrong. I referring to the slide "Policy Gradient: Use Temporal Structure" from http://rll.berkeley.edu/deeprlcourse/docs/lec2.pdf for my above comment.
Thanks!
I think the implementation is correct:
- Given the Markov property, the probability of a state/action does not depend on previous states/actions. If think incorporating past rewards into the loss function would violate that principle.
- the value of a state-action is usually the expected discounted future rewards
Not sure though
It's not the fastest policy gradient implementation but it works:
But it also makes me wonder how to make it less noisy.
Here is a graph from another implementation here: https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py
In your link "https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py" it's not policy gradients used
- The noise can be inherent to the environment, if stochastic
- actor critic can eliminate noise because the gradient is calculated using a value network rather than (high-variance) returns of the environment
Copied your code, when tried to run received following error in get_updates():
TypeError: get_updates() got an unexpected keyword argument 'constraints'
why this might be ?
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'
. I simply commented out theconstraints=[],
at line 93. - Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.
To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution()
. - Then it throws
IndexError: list index out of range
because ofoutputs=[],
(line 99). Just change it tooutputs=[self.model.output],
and it runs fine for me.
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'
. I simply commented out theconstraints=[],
at line 93.- Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.
To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution()
.- Then it throws
IndexError: list index out of range
because ofoutputs=[],
(line 99). Just change it tooutputs=[self.model.output],
and it runs fine for me.
thank you :D
It's not the fastest policy gradient implementation but it works:
But it also makes me wonder how to make it less noisy.
Here is a graph from another implementation here: https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py