Last active
February 5, 2018 11:23
-
-
Save breeko/929004a66c65efe8257169420ca142e6 to your computer and use it in GitHub Desktop.
Simple evolutionary models that can be used with OpenAI environments
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import gym | |
def create_population(env, size=1, mean=0, std=1): | |
""" Creates a population """ | |
params = env.observation_space.shape[0] # Parameters in our environment state | |
action_space = env.action_space.n # Possible actions | |
if mean is None: | |
# No mean specified, assume mean of zero | |
mean = np.zeros((params, action_space)) | |
if std is None: | |
# No standard deviation specified, assume standard deviation of 1 | |
std = np.ones((params,action_space)) | |
# Create a population based on a normal distribution given the mean and std provided | |
pop = np.random.normal(mean,std,size=[size,params,action_space]) | |
return pop | |
def mutate(population, prob_mutate, std): | |
""" Mutates a population based on normal distribution """ | |
# Create a mask of 0s and 1s that are used to determine whether a mutation will take place or not | |
# on the attribute level for each member | |
mutation_mask = np.random.choice([0,1], size=population.shape, p=[1-prob_mutate, prob_mutate]) | |
# Create a mutation based on a normal distribution | |
mutation = np.random.normal(0, std, size=population.shape) | |
# Apply the mutation mask | |
mutation *= mutation_mask | |
return population + mutation # Add the mutation to the population | |
def breed(population): | |
""" Breeds a population with itself. Each individual is paired up with another individual from the same | |
population and their values are chosen with a 50 / 50 chance of the offspring acquiring a value from | |
either parent. | |
""" | |
parent_1 = population.copy() | |
parent_2 = population.copy() | |
np.random.shuffle(parent_2) # Mix up one of the parent's ordering so we can just align them randomly | |
# Used to determine if parent one attribute will be inherited | |
parent_1_mask = np.random.choice([0,1],size=population.shape) | |
# If parent two's attribute will be inherited | |
parent_2_mask = (parent_1_mask + 1) % 2 | |
return parent_1 * parent_1_mask + parent_2 * parent_2_mask | |
def normalize(env,state): | |
""" Normalizes state to range from 0 to 1 """ | |
if env.observation_space.low == float("inf"): | |
# Some observation spaces are infinite, in which case we won't normalize | |
lo = 0 | |
hi = 1 | |
else: | |
lo = env.observation_space.low | |
hi = env.observation_space.high | |
return (state - lo) / (hi - lo) | |
def score(env, ind, trials=1, curiousity=False, render=False): | |
rewards = 0 | |
for trial in range(trials): | |
state = env.reset() | |
min_state = state | |
max_state = state | |
done = False | |
while not done: | |
if render: | |
env.render() | |
num_moves =+ 1 | |
out = np.dot(state,ind) | |
action = np.argmax(out) | |
state, reward, done, _ = env.step(action) | |
min_state = np.min([min_state, state],axis=0) | |
max_state = np.max([max_state, state],axis=0) | |
rewards += reward | |
if curiousity: | |
# Apply some value to exploration. This will nudge the algorithm in favor of bots that explore | |
# more of the environment space. | |
rewards += np.sum(max_state - min_state) | |
env.close() | |
return rewards / float(trials) | |
def train(env, pop_size, trials_per_individual, survival_rate, prob_mutate, std, num_episodes, std_decay=0.9, curiousity=False, verbose=False, goal=200): | |
""" Trains a bot based on an envolutionary model """ | |
params = env.observation_space.shape[0] # Determine the number of parameters our environment has | |
pop = create_population(env, pop_size) # Create the population | |
pop_survive = max(1,int(pop_size * survival_rate)) # Set the number of individuals that will survive after each episode | |
best_score = float("-inf") # Track best score | |
for episode in range(num_episodes): | |
# Score individuals | |
scores = [score(env, ind, trials=trials_per_individual, curiousity=curiousity) for ind in pop] | |
# Convert to list | |
pop_scores = zip(pop,scores) | |
# Sort by how well each individual did | |
pop_scores = sorted(pop_scores, key=lambda pop_score: pop_score[1], reverse=True) | |
pop_scores = list(zip(*pop_scores)) # Apply scores to individuals | |
pop, scores = pop_scores # Break apart population and scores | |
pop = pop[:pop_survive] # Only keep the best | |
pop = np.array(pop) | |
pop = np.vstack([pop, breed(pop)]) # Breed the population and append to existing population | |
pop = np.vstack([pop, mutate(pop, prob_mutate, std)]) # Mutate the population and append to existing population | |
# Determine how many new individuals to introduct to keep population number constant | |
remaining_pop = max(0, pop_size - len(pop)) | |
# Determine the mean of the population parameters | |
mean = np.mean(create_population(env,size=10,mean=0, std=1),axis=0) | |
# Add new members to population | |
new_pop = create_population(env, size=remaining_pop,mean=mean, std=std) | |
pop = np.vstack([pop, new_pop]) | |
if verbose: | |
print("episode: {} best score: {:0.2f}".format(episode, scores[0])) | |
if scores[0] > best_score: | |
# If best score is better than prior best score, decay the standard deviation since we're | |
# likely getting to an optimal individual and we want variant to decrease | |
std *= std_decay | |
best_score = scores[0] | |
else: | |
# Prior best score not reached, increase standard deviation to add more variety and hopefully | |
# break through any plateaus | |
std /= std_decay | |
if best_score >= goal: | |
if verbose: | |
print("training complete in {} episodes".format(episode)) | |
break | |
return pop[0] # Return best invidual after training |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment