Created
June 2, 2017 10:43
-
-
Save breeko/dea86aa585db5e26f297ca78759cc0d4 to your computer and use it in GitHub Desktop.
TD Learning algorithm used to solve cartpole OpenAI environment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
from sklearn.preprocessing import StandardScaler | |
from keras.models import Sequential | |
from keras.layers import Dense, Dropout | |
from keras.optimizers import Adamax | |
# TD Learning | |
MEMORY = 1 # Number of prior states to consider when training our agent | |
GAMMA = 0.99 # Discount factor to attribute to subsequent rewards of a state | |
HIDDEN_LAYERS = [64] # Hidden layers of the neural network | |
NUM_EPOCHS = 50 # Number of epochs | |
NUM_SESSIONS = 100 # Number of sessions in each epoch | |
BATCH_SIZE = 1024 # Size of batch | |
EPISODES_STORED = 10000 # Number of episodes stored in experience replay | |
SKIP_FIRST = 0 # Number of epochs before start making decisions based on trained agent | |
LOOK_AHEAD = 2 # Amount of subsequent rewards to consider in scoring a state | |
GOAL_CONSECUTIVE_TRIALS = 100 # Number of trials used to determine running average | |
GOAL = 195 # Goal score | |
PRINT_EVERY = 1 # How often to print results | |
SEED = 0 # Random seed for numpy and gym environment | |
UPLOAD = True # Whether to upload to openai | |
env = gym.make("CartPole-v0") | |
env.seed(SEED) | |
if UPLOAD: | |
from key import api_key | |
env = gym.wrappers.Monitor(env,directory="videos",force=True) | |
np.random.seed(SEED) | |
np.set_printoptions(precision=4) | |
class ExperienceReplay: | |
""" Stores episodes used for training agent """ | |
def __init__(self, max_episodes=10000): | |
self.max_episodes = max_episodes | |
self.episodes = [] | |
def add(self,episodes): | |
self.episodes.extend(episodes) | |
self.episodes = self.episodes[-self.max_episodes:] | |
def get_batch(self, batch_size): | |
return [self.episodes[idx] for idx in np.random.randint(0,high=len(self.episodes),size=batch_size)] | |
def init_agent(env, hidden_layers=(256,), memory=1, activation="tanh", dropout_amt=0): | |
""" Returns a keras Sequential model tailored to the openai environment | |
Arguments: | |
env: openai environment | |
hidden_layers (optional): hidden layers in the neural network | |
memory (optional): number of prior states it will consider as input | |
Output: | |
keras Sequential model | |
""" | |
model = Sequential() | |
input_dim = len(env.reset() * memory + 1) | |
for num_layer, layer in enumerate(hidden_layers): | |
model.add(Dense(layer, input_shape=(input_dim,),activation=activation)) | |
if dropout_amt > 0: | |
model.add(Dropout(0.5)) | |
input_dim = layer | |
model.add(Dense(env.action_space.n,activation="linear")) | |
model.compile(optimizer=Adamax(), loss='mse') | |
return model | |
def generate_session(env, agent=None, memory=1, t_max=10**3, render=False, prob_random=0.25, scaler=None): | |
""" Generates a session | |
Arguments: | |
env: openai environment | |
agent (optional): agent to dtermine policies | |
memory (optional): number or prior states that will be considered. Only applicable when agent is None, | |
otherwise, memory is implied for the agent's input dimensions | |
t_max (optional): maximum number of steps that can be taken in one episode | |
render (optional): whether to render the environment | |
prob_random (optional): probability of a random action. Only applicable when agent is None, otherwise | |
prob_random is 1 (always random) | |
scaler (optional): scaler applied to states | |
Output: | |
states, actions, rewards, next_states | |
""" | |
state = env.reset() | |
if scaler is not None: | |
state = scaler.transform(state.reshape(1,-1)).reshape(-1) | |
obs_size = len(state) | |
if agent is not None: | |
input_dimen = agent.input_shape[1] | |
memory = agent.input_shape[1] // obs_size | |
else: | |
input_dimen = obs_size | |
state = np.concatenate([state for _ in range(memory)]) | |
states = [] | |
actions = [] | |
rewards = [] | |
next_states = [] | |
for step in range(t_max): | |
if render: | |
env.render() | |
if agent is None or np.random.random() < prob_random: | |
action = env.action_space.sample() | |
else: | |
policy = agent.predict(state.reshape(1,-1),verbose=False)[0] | |
action = np.argmax(policy) | |
next_state, reward, done, info = env.step(action) | |
if scaler is not None: | |
next_state = scaler.transform(next_state.reshape(1,-1)).reshape(-1) | |
if len(states) == 0: # First move | |
state = np.concatenate([state[obs_size:], state]) | |
else: | |
state = np.concatenate([states[-1][obs_size:], state]) | |
states.append(state) | |
actions.append(action) | |
rewards.append(reward) | |
next_states.append(next_state) | |
state = next_state | |
if done: | |
break | |
return states, actions, rewards, next_states | |
def discount(rewards, disc_factor=0.99): | |
return np.sum([reward * disc_factor ** i for i, reward in enumerate(rewards)]) | |
def get_epsilon_greedy(env, state, agent, prob_random=0.25): | |
""" Determines an epsilon-greedy action to take based on state """ | |
if prob_random > np.random.random(): | |
return np.random.randint(env.action_space.n) | |
return np.argmax(agent.predict(state)) | |
scaler = StandardScaler() | |
cum_states = [] | |
for _ in range(100): | |
states, _, _, _ = generate_session(env=env,agent=None,memory=MEMORY) | |
cum_states.extend(states) | |
scaler.fit(cum_states) | |
agent = init_agent(env=env, hidden_layers = HIDDEN_LAYERS) | |
cum_rewards = [] | |
er = ExperienceReplay(EPISODES_STORED) | |
for epoch in range(NUM_EPOCHS): | |
prob_random = 1.0/(np.sqrt(max(0, epoch - SKIP_FIRST)) + 1) | |
for num_session in range(NUM_SESSIONS): | |
if epoch < SKIP_FIRST: | |
session = generate_session(env,agent=None,memory=MEMORY,scaler=scaler) | |
else: | |
session = generate_session(env,agent=agent,memory=MEMORY,scaler=scaler,prob_random=prob_random) | |
states, actions, rewards, next_states = session | |
num_steps = len(states) | |
updated_rewards = [discount(rewards[start: start + LOOK_AHEAD], GAMMA) for start in range(num_steps)] | |
updated_next_states = [next_states[min(num_steps - 1, start + LOOK_AHEAD - 1)] for start in range(num_steps)] | |
dones = [True if start + LOOK_AHEAD >= num_steps else False for start in range(num_steps)] | |
assert(len(states) == len(actions) == len(updated_rewards) == len(updated_next_states) == len(dones)) | |
updated_session = list(zip(states,actions, updated_rewards, updated_next_states, dones)) | |
er.add(updated_session) | |
cum_rewards.append(sum(rewards)) | |
# Training | |
batch = er.get_batch(BATCH_SIZE) | |
states, actions, rewards, next_states, dones = map(np.array,list(zip(*batch))) | |
not_dones = dones == False | |
X = states | |
y = agent.predict(states) | |
y[np.arange(len(y)),actions] = rewards + (not_dones * np.max(agent.predict(next_states), axis=1)) | |
agent.fit(X,y,verbose=False) | |
if epoch % PRINT_EVERY == 0: | |
print("Epoch {} mean rewards: {:0.2f} prob_random: {:0.2f}".format(epoch, np.mean(cum_rewards[-NUM_SESSIONS:]),prob_random)) | |
if np.mean(cum_rewards[-GOAL_CONSECUTIVE_TRIALS:]) > GOAL and len(cum_rewards) >= GOAL_CONSECUTIVE_TRIALS: | |
print("Training complete after {} episodes! Mean score {}".format(len(cum_rewards), np.mean(cum_rewards[-GOAL_CONSECUTIVE_TRIALS:]))) | |
break | |
env.close() | |
if UPLOAD: | |
gym.upload("./videos/",api_key=api_key) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment