Skip to content

Instantly share code, notes, and snippets.

@breeko
Last active February 5, 2018 11:23
Show Gist options
  • Save breeko/929004a66c65efe8257169420ca142e6 to your computer and use it in GitHub Desktop.
Save breeko/929004a66c65efe8257169420ca142e6 to your computer and use it in GitHub Desktop.
Simple evolutionary models that can be used with OpenAI environments
import numpy as np
import gym
def create_population(env, size=1, mean=0, std=1):
""" Creates a population """
params = env.observation_space.shape[0] # Parameters in our environment state
action_space = env.action_space.n # Possible actions
if mean is None:
# No mean specified, assume mean of zero
mean = np.zeros((params, action_space))
if std is None:
# No standard deviation specified, assume standard deviation of 1
std = np.ones((params,action_space))
# Create a population based on a normal distribution given the mean and std provided
pop = np.random.normal(mean,std,size=[size,params,action_space])
return pop
def mutate(population, prob_mutate, std):
""" Mutates a population based on normal distribution """
# Create a mask of 0s and 1s that are used to determine whether a mutation will take place or not
# on the attribute level for each member
mutation_mask = np.random.choice([0,1], size=population.shape, p=[1-prob_mutate, prob_mutate])
# Create a mutation based on a normal distribution
mutation = np.random.normal(0, std, size=population.shape)
# Apply the mutation mask
mutation *= mutation_mask
return population + mutation # Add the mutation to the population
def breed(population):
""" Breeds a population with itself. Each individual is paired up with another individual from the same
population and their values are chosen with a 50 / 50 chance of the offspring acquiring a value from
either parent.
"""
parent_1 = population.copy()
parent_2 = population.copy()
np.random.shuffle(parent_2) # Mix up one of the parent's ordering so we can just align them randomly
# Used to determine if parent one attribute will be inherited
parent_1_mask = np.random.choice([0,1],size=population.shape)
# If parent two's attribute will be inherited
parent_2_mask = (parent_1_mask + 1) % 2
return parent_1 * parent_1_mask + parent_2 * parent_2_mask
def normalize(env,state):
""" Normalizes state to range from 0 to 1 """
if env.observation_space.low == float("inf"):
# Some observation spaces are infinite, in which case we won't normalize
lo = 0
hi = 1
else:
lo = env.observation_space.low
hi = env.observation_space.high
return (state - lo) / (hi - lo)
def score(env, ind, trials=1, curiousity=False, render=False):
rewards = 0
for trial in range(trials):
state = env.reset()
min_state = state
max_state = state
done = False
while not done:
if render:
env.render()
num_moves =+ 1
out = np.dot(state,ind)
action = np.argmax(out)
state, reward, done, _ = env.step(action)
min_state = np.min([min_state, state],axis=0)
max_state = np.max([max_state, state],axis=0)
rewards += reward
if curiousity:
# Apply some value to exploration. This will nudge the algorithm in favor of bots that explore
# more of the environment space.
rewards += np.sum(max_state - min_state)
env.close()
return rewards / float(trials)
def train(env, pop_size, trials_per_individual, survival_rate, prob_mutate, std, num_episodes, std_decay=0.9, curiousity=False, verbose=False, goal=200):
""" Trains a bot based on an envolutionary model """
params = env.observation_space.shape[0] # Determine the number of parameters our environment has
pop = create_population(env, pop_size) # Create the population
pop_survive = max(1,int(pop_size * survival_rate)) # Set the number of individuals that will survive after each episode
best_score = float("-inf") # Track best score
for episode in range(num_episodes):
# Score individuals
scores = [score(env, ind, trials=trials_per_individual, curiousity=curiousity) for ind in pop]
# Convert to list
pop_scores = zip(pop,scores)
# Sort by how well each individual did
pop_scores = sorted(pop_scores, key=lambda pop_score: pop_score[1], reverse=True)
pop_scores = list(zip(*pop_scores)) # Apply scores to individuals
pop, scores = pop_scores # Break apart population and scores
pop = pop[:pop_survive] # Only keep the best
pop = np.array(pop)
pop = np.vstack([pop, breed(pop)]) # Breed the population and append to existing population
pop = np.vstack([pop, mutate(pop, prob_mutate, std)]) # Mutate the population and append to existing population
# Determine how many new individuals to introduct to keep population number constant
remaining_pop = max(0, pop_size - len(pop))
# Determine the mean of the population parameters
mean = np.mean(create_population(env,size=10,mean=0, std=1),axis=0)
# Add new members to population
new_pop = create_population(env, size=remaining_pop,mean=mean, std=std)
pop = np.vstack([pop, new_pop])
if verbose:
print("episode: {} best score: {:0.2f}".format(episode, scores[0]))
if scores[0] > best_score:
# If best score is better than prior best score, decay the standard deviation since we're
# likely getting to an optimal individual and we want variant to decrease
std *= std_decay
best_score = scores[0]
else:
# Prior best score not reached, increase standard deviation to add more variety and hopefully
# break through any plateaus
std /= std_decay
if best_score >= goal:
if verbose:
print("training complete in {} episodes".format(episode))
break
return pop[0] # Return best invidual after training
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment