-
-
Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
""" | |
Simple policy gradient in Keras | |
""" | |
import gym | |
import numpy as np | |
from keras import layers | |
from keras.models import Model | |
from keras import backend as K | |
from keras import utils as np_utils | |
from keras import optimizers | |
class Agent(object): | |
def __init__(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
"""Gym Playing Agent | |
Args: | |
input_dim (int): the dimension of state. | |
Same as `env.observation_space.shape[0]` | |
output_dim (int): the number of discrete actions | |
Same as `env.action_space.n` | |
hidden_dims (list): hidden dimensions | |
Methods: | |
private: | |
__build_train_fn -> None | |
It creates a train function | |
It's similar to defining `train_op` in Tensorflow | |
__build_network -> None | |
It create a base model | |
Its output is each action probability | |
public: | |
get_action(state) -> action | |
fit(state, action, reward) -> None | |
""" | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
self.__build_network(input_dim, output_dim, hidden_dims) | |
self.__build_train_fn() | |
def __build_network(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
"""Create a base network""" | |
self.X = layers.Input(shape=(input_dim,)) | |
net = self.X | |
for h_dim in hidden_dims: | |
net = layers.Dense(h_dim)(net) | |
net = layers.Activation("relu")(net) | |
net = layers.Dense(output_dim)(net) | |
net = layers.Activation("softmax")(net) | |
self.model = Model(inputs=self.X, outputs=net) | |
def __build_train_fn(self): | |
"""Create a train function | |
It replaces `model.fit(X, y)` because we use the output of model and use it for training. | |
For example, we need action placeholder | |
called `action_one_hot` that stores, which action we took at state `s`. | |
Hence, we can update the same action. | |
This function will create | |
`self.train_fn([state, action_one_hot, discount_reward])` | |
which would train the model. | |
""" | |
action_prob_placeholder = self.model.output | |
action_onehot_placeholder = K.placeholder(shape=(None, self.output_dim), | |
name="action_onehot") | |
discount_reward_placeholder = K.placeholder(shape=(None,), | |
name="discount_reward") | |
action_prob = K.sum(action_prob_placeholder * action_onehot_placeholder, axis=1) | |
log_action_prob = K.log(action_prob) | |
loss = - log_action_prob * discount_reward_placeholder | |
loss = K.mean(loss) | |
adam = optimizers.Adam() | |
updates = adam.get_updates(params=self.model.trainable_weights, | |
constraints=[], | |
loss=loss) | |
self.train_fn = K.function(inputs=[self.model.input, | |
action_onehot_placeholder, | |
discount_reward_placeholder], | |
outputs=[], | |
updates=updates) | |
def get_action(self, state): | |
"""Returns an action at given `state` | |
Args: | |
state (1-D or 2-D Array): It can be either 1-D array of shape (state_dimension, ) | |
or 2-D array shape of (n_samples, state_dimension) | |
Returns: | |
action: an integer action value ranging from 0 to (n_actions - 1) | |
""" | |
shape = state.shape | |
if len(shape) == 1: | |
assert shape == (self.input_dim,), "{} != {}".format(shape, self.input_dim) | |
state = np.expand_dims(state, axis=0) | |
elif len(shape) == 2: | |
assert shape[1] == (self.input_dim), "{} != {}".format(shape, self.input_dim) | |
else: | |
raise TypeError("Wrong state shape is given: {}".format(state.shape)) | |
action_prob = np.squeeze(self.model.predict(state)) | |
assert len(action_prob) == self.output_dim, "{} != {}".format(len(action_prob), self.output_dim) | |
return np.random.choice(np.arange(self.output_dim), p=action_prob) | |
def fit(self, S, A, R): | |
"""Train a network | |
Args: | |
S (2-D Array): `state` array of shape (n_samples, state_dimension) | |
A (1-D Array): `action` array of shape (n_samples,) | |
It's simply a list of int that stores which actions the agent chose | |
R (1-D Array): `reward` array of shape (n_samples,) | |
A reward is given after each action. | |
""" | |
action_onehot = np_utils.to_categorical(A, num_classes=self.output_dim) | |
discount_reward = compute_discounted_R(R) | |
assert S.shape[1] == self.input_dim, "{} != {}".format(S.shape[1], self.input_dim) | |
assert action_onehot.shape[0] == S.shape[0], "{} != {}".format(action_onehot.shape[0], S.shape[0]) | |
assert action_onehot.shape[1] == self.output_dim, "{} != {}".format(action_onehot.shape[1], self.output_dim) | |
assert len(discount_reward.shape) == 1, "{} != 1".format(len(discount_reward.shape)) | |
self.train_fn([S, action_onehot, discount_reward]) | |
def compute_discounted_R(R, discount_rate=.99): | |
"""Returns discounted rewards | |
Args: | |
R (1-D array): a list of `reward` at each time step | |
discount_rate (float): Will discount the future value by this rate | |
Returns: | |
discounted_r (1-D array): same shape as input `R` | |
but the values are discounted | |
Examples: | |
>>> R = [1, 1, 1] | |
>>> compute_discounted_R(R, .99) # before normalization | |
[1 + 0.99 + 0.99**2, 1 + 0.99, 1] | |
""" | |
discounted_r = np.zeros_like(R, dtype=np.float32) | |
running_add = 0 | |
for t in reversed(range(len(R))): | |
running_add = running_add * discount_rate + R[t] | |
discounted_r[t] = running_add | |
discounted_r -= discounted_r.mean() / discounted_r.std() | |
return discounted_r | |
def run_episode(env, agent): | |
"""Returns an episode reward | |
(1) Play until the game is done | |
(2) The agent will choose an action according to the policy | |
(3) When it's done, it will train from the game play | |
Args: | |
env (gym.env): Gym environment | |
agent (Agent): Game Playing Agent | |
Returns: | |
total_reward (int): total reward earned during the whole episode | |
""" | |
done = False | |
S = [] | |
A = [] | |
R = [] | |
s = env.reset() | |
total_reward = 0 | |
while not done: | |
a = agent.get_action(s) | |
s2, r, done, info = env.step(a) | |
total_reward += r | |
S.append(s) | |
A.append(a) | |
R.append(r) | |
s = s2 | |
if done: | |
S = np.array(S) | |
A = np.array(A) | |
R = np.array(R) | |
agent.fit(S, A, R) | |
return total_reward | |
def main(): | |
try: | |
env = gym.make("CartPole-v0") | |
input_dim = env.observation_space.shape[0] | |
output_dim = env.action_space.n | |
agent = Agent(input_dim, output_dim, [16, 16]) | |
for episode in range(2000): | |
reward = run_episode(env, agent) | |
print(episode, reward) | |
finally: | |
env.close() | |
if __name__ == '__main__': | |
main() |
Nice code!
Very slight bug in
discounted_r -= discounted_r.mean() / discounted_r.std()
which doesn't standardize properly.
discounted_r -= ( discounted_r.mean() / discounted_r.std() )
There might be some problem about this implementation. In fact, it just use one trajectory to estimate the gradient of loss function. However, in typical policy gradient algorithm, we should use multiple trajectories (each has multiple time steps) to estimate the gradient of loss function.
@AchillesJJ, Please clarify. It seems like each episode is a trajectory with multiple time steps and when the episode is done, the weights are being updated according to the gradient of the loss function. How is it possible to have multiple trajectories within an episode for policy gradient method? Perhaps with DQN or variations of Actor-Critic when a target network and diffierent policies are used, you can have multiple trajectories and estimate the gradient of the loss function from say a target network approximating a value function and update the nn approximating the policy function with it.
It's not the fastest policy gradient implementation but it works:
But it also makes me wonder how to make it less noisy.
Here is a graph from another implementation here: https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py
When you are computing your loss function for given timestep, you don't sum over the previous timesteps for that episode/trajectory. Why?
I am new to policy gradient, so I maybe wrong. I referring to the slide "Policy Gradient: Use Temporal Structure" from http://rll.berkeley.edu/deeprlcourse/docs/lec2.pdf for my above comment.
Thanks!
I think the implementation is correct:
- Given the Markov property, the probability of a state/action does not depend on previous states/actions. If think incorporating past rewards into the loss function would violate that principle.
- the value of a state-action is usually the expected discounted future rewards
Not sure though
It's not the fastest policy gradient implementation but it works:
But it also makes me wonder how to make it less noisy.
Here is a graph from another implementation here: https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py
In your link "https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py" it's not policy gradients used
- The noise can be inherent to the environment, if stochastic
- actor critic can eliminate noise because the gradient is calculated using a value network rather than (high-variance) returns of the environment
Copied your code, when tried to run received following error in get_updates():
TypeError: get_updates() got an unexpected keyword argument 'constraints'
why this might be ?
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'
. I simply commented out theconstraints=[],
at line 93. - Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.
To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution()
. - Then it throws
IndexError: list index out of range
because ofoutputs=[],
(line 99). Just change it tooutputs=[self.model.output],
and it runs fine for me.
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'
. I simply commented out theconstraints=[],
at line 93.- Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.
To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution()
.- Then it throws
IndexError: list index out of range
because ofoutputs=[],
(line 99). Just change it tooutputs=[self.model.output],
and it runs fine for me.
thank you :D
When you are computing your loss function for given timestep, you don't sum over the previous timesteps for that episode/trajectory. Why?
I am new to policy gradient, so I maybe wrong. I referring to the slide "Policy Gradient: Use Temporal Structure" from http://rll.berkeley.edu/deeprlcourse/docs/lec2.pdf for my above comment.
Thanks!