Skip to content

Instantly share code, notes, and snippets.

@eph2795
Last active February 17, 2017 08:19
Show Gist options
  • Save eph2795/9dd1c6f8f205f9e07c179d2f727415db to your computer and use it in GitHub Desktop.
Save eph2795/9dd1c6f8f205f9e07c179d2f727415db to your computer and use it in GitHub Desktop.
Frozenlake8x8-v0
import gym
import numpy as np
import matplotlib.pyplot as plt
def get_random_policy():
"""
Build a numpy array representing agent policy.
This array must have one element per each of 16 environment states.
Element must be an integer from 0 to 3, representing action
to take from that state.
"""
return np.random.randint(0, n_actions, n_states)
def sample_reward(env, policy, t_max=100):
"""
Interact with an environment, return sum of all rewards.
If game doesn't end on t_max (e.g. agent walks into a wall),
force end the game and return whatever reward you got so far.
Tip: see signature of env.step(...) method above.
"""
s = env.reset()
total_reward = 0
for _ in range(t_max):
s, reward, done, _ = env.step(policy[s])
total_reward += reward
if done:
break
return total_reward
def evaluate(policy, t_max=100, n_times=100, reward=sample_reward):
"""Run several evaluations and average the score the policy gets."""
rewards = [reward(env, policy, t_max) for _ in range(n_times)]
return float(np.mean(rewards))
def mutation(policy, p=0.1):
"""
for each state, with probability p replace action with random action
Tip: mutation can be written as crossover with random policy
"""
return crossover(policy, get_random_policy(), p)
def crossover(policy1, policy2, p=0.5):
"""
for each state, with probability p take action from policy1, else policy2
"""
randomness = np.random.choice([0, 1], n_states, [p, 1 - p])
return np.where(randomness == 0, policy1, policy2)
n_epochs = 50
pool_size = 100
n_crossovers = 50
n_mutations = 50
t_max = 300
n_times = 200
pool = [get_random_policy() for _ in range(pool_size)]
pool_scores = list(map(evaluate, pool, np.full(len(pool), t_max, np.int), np.full(len(pool), n_times, np.int)))
def mutation_probability(policy_score, a=1, k=4):
return (a - policy_score) / k
scores = []
for epoch in range(n_epochs):
norm = sum(pool_scores) if sum(pool_scores) > 0 else 1
rands1 = np.random.choice(pool_size, n_crossovers, [e / norm for e in pool_scores])
rands2 = np.random.choice(pool_size, n_crossovers, [e / norm for e in pool_scores])
probs = [pool_scores[i] / (pool_scores[i] + pool_scores[j]) if pool_scores[i] + pool_scores[j] > 0 else 0.5
for i, j in zip(rands1, rands2)]
crossovered = [crossover(pool[i], pool[j], p)
for i, j, p in zip(rands1, rands2, probs)]
rands = np.random.randint(0, pool_size, n_mutations)
mutated = [mutation(pool[i], mutation_probability(pool_scores[i])) for i in rands]
assert type(crossovered) == type(mutated) == list
#add new policies to the pool
pool = pool + crossovered + mutated
pool_scores = list(map(evaluate, pool, np.full(len(pool), t_max, np.int), np.full(len(pool), n_times, np.int)))
#select pool_size best policies
selected_indices = np.argsort(pool_scores)[-pool_size:]
pool = [pool[i] for i in selected_indices]
pool_scores = [pool_scores[i] for i in selected_indices]
scores.append(pool_scores[-1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment