Skip to content

Instantly share code, notes, and snippets.

@breeko
Created June 2, 2017 10:43
Show Gist options
  • Save breeko/dea86aa585db5e26f297ca78759cc0d4 to your computer and use it in GitHub Desktop.
Save breeko/dea86aa585db5e26f297ca78759cc0d4 to your computer and use it in GitHub Desktop.
TD Learning algorithm used to solve cartpole OpenAI environment
import gym
import numpy as np
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import Adamax
# TD Learning
MEMORY = 1 # Number of prior states to consider when training our agent
GAMMA = 0.99 # Discount factor to attribute to subsequent rewards of a state
HIDDEN_LAYERS = [64] # Hidden layers of the neural network
NUM_EPOCHS = 50 # Number of epochs
NUM_SESSIONS = 100 # Number of sessions in each epoch
BATCH_SIZE = 1024 # Size of batch
EPISODES_STORED = 10000 # Number of episodes stored in experience replay
SKIP_FIRST = 0 # Number of epochs before start making decisions based on trained agent
LOOK_AHEAD = 2 # Amount of subsequent rewards to consider in scoring a state
GOAL_CONSECUTIVE_TRIALS = 100 # Number of trials used to determine running average
GOAL = 195 # Goal score
PRINT_EVERY = 1 # How often to print results
SEED = 0 # Random seed for numpy and gym environment
UPLOAD = True # Whether to upload to openai
env = gym.make("CartPole-v0")
env.seed(SEED)
if UPLOAD:
from key import api_key
env = gym.wrappers.Monitor(env,directory="videos",force=True)
np.random.seed(SEED)
np.set_printoptions(precision=4)
class ExperienceReplay:
""" Stores episodes used for training agent """
def __init__(self, max_episodes=10000):
self.max_episodes = max_episodes
self.episodes = []
def add(self,episodes):
self.episodes.extend(episodes)
self.episodes = self.episodes[-self.max_episodes:]
def get_batch(self, batch_size):
return [self.episodes[idx] for idx in np.random.randint(0,high=len(self.episodes),size=batch_size)]
def init_agent(env, hidden_layers=(256,), memory=1, activation="tanh", dropout_amt=0):
""" Returns a keras Sequential model tailored to the openai environment
Arguments:
env: openai environment
hidden_layers (optional): hidden layers in the neural network
memory (optional): number of prior states it will consider as input
Output:
keras Sequential model
"""
model = Sequential()
input_dim = len(env.reset() * memory + 1)
for num_layer, layer in enumerate(hidden_layers):
model.add(Dense(layer, input_shape=(input_dim,),activation=activation))
if dropout_amt > 0:
model.add(Dropout(0.5))
input_dim = layer
model.add(Dense(env.action_space.n,activation="linear"))
model.compile(optimizer=Adamax(), loss='mse')
return model
def generate_session(env, agent=None, memory=1, t_max=10**3, render=False, prob_random=0.25, scaler=None):
""" Generates a session
Arguments:
env: openai environment
agent (optional): agent to dtermine policies
memory (optional): number or prior states that will be considered. Only applicable when agent is None,
otherwise, memory is implied for the agent's input dimensions
t_max (optional): maximum number of steps that can be taken in one episode
render (optional): whether to render the environment
prob_random (optional): probability of a random action. Only applicable when agent is None, otherwise
prob_random is 1 (always random)
scaler (optional): scaler applied to states
Output:
states, actions, rewards, next_states
"""
state = env.reset()
if scaler is not None:
state = scaler.transform(state.reshape(1,-1)).reshape(-1)
obs_size = len(state)
if agent is not None:
input_dimen = agent.input_shape[1]
memory = agent.input_shape[1] // obs_size
else:
input_dimen = obs_size
state = np.concatenate([state for _ in range(memory)])
states = []
actions = []
rewards = []
next_states = []
for step in range(t_max):
if render:
env.render()
if agent is None or np.random.random() < prob_random:
action = env.action_space.sample()
else:
policy = agent.predict(state.reshape(1,-1),verbose=False)[0]
action = np.argmax(policy)
next_state, reward, done, info = env.step(action)
if scaler is not None:
next_state = scaler.transform(next_state.reshape(1,-1)).reshape(-1)
if len(states) == 0: # First move
state = np.concatenate([state[obs_size:], state])
else:
state = np.concatenate([states[-1][obs_size:], state])
states.append(state)
actions.append(action)
rewards.append(reward)
next_states.append(next_state)
state = next_state
if done:
break
return states, actions, rewards, next_states
def discount(rewards, disc_factor=0.99):
return np.sum([reward * disc_factor ** i for i, reward in enumerate(rewards)])
def get_epsilon_greedy(env, state, agent, prob_random=0.25):
""" Determines an epsilon-greedy action to take based on state """
if prob_random > np.random.random():
return np.random.randint(env.action_space.n)
return np.argmax(agent.predict(state))
scaler = StandardScaler()
cum_states = []
for _ in range(100):
states, _, _, _ = generate_session(env=env,agent=None,memory=MEMORY)
cum_states.extend(states)
scaler.fit(cum_states)
agent = init_agent(env=env, hidden_layers = HIDDEN_LAYERS)
cum_rewards = []
er = ExperienceReplay(EPISODES_STORED)
for epoch in range(NUM_EPOCHS):
prob_random = 1.0/(np.sqrt(max(0, epoch - SKIP_FIRST)) + 1)
for num_session in range(NUM_SESSIONS):
if epoch < SKIP_FIRST:
session = generate_session(env,agent=None,memory=MEMORY,scaler=scaler)
else:
session = generate_session(env,agent=agent,memory=MEMORY,scaler=scaler,prob_random=prob_random)
states, actions, rewards, next_states = session
num_steps = len(states)
updated_rewards = [discount(rewards[start: start + LOOK_AHEAD], GAMMA) for start in range(num_steps)]
updated_next_states = [next_states[min(num_steps - 1, start + LOOK_AHEAD - 1)] for start in range(num_steps)]
dones = [True if start + LOOK_AHEAD >= num_steps else False for start in range(num_steps)]
assert(len(states) == len(actions) == len(updated_rewards) == len(updated_next_states) == len(dones))
updated_session = list(zip(states,actions, updated_rewards, updated_next_states, dones))
er.add(updated_session)
cum_rewards.append(sum(rewards))
# Training
batch = er.get_batch(BATCH_SIZE)
states, actions, rewards, next_states, dones = map(np.array,list(zip(*batch)))
not_dones = dones == False
X = states
y = agent.predict(states)
y[np.arange(len(y)),actions] = rewards + (not_dones * np.max(agent.predict(next_states), axis=1))
agent.fit(X,y,verbose=False)
if epoch % PRINT_EVERY == 0:
print("Epoch {} mean rewards: {:0.2f} prob_random: {:0.2f}".format(epoch, np.mean(cum_rewards[-NUM_SESSIONS:]),prob_random))
if np.mean(cum_rewards[-GOAL_CONSECUTIVE_TRIALS:]) > GOAL and len(cum_rewards) >= GOAL_CONSECUTIVE_TRIALS:
print("Training complete after {} episodes! Mean score {}".format(len(cum_rewards), np.mean(cum_rewards[-GOAL_CONSECUTIVE_TRIALS:])))
break
env.close()
if UPLOAD:
gym.upload("./videos/",api_key=api_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment