Last active
February 17, 2017 08:19
-
-
Save eph2795/9dd1c6f8f205f9e07c179d2f727415db to your computer and use it in GitHub Desktop.
Frozenlake8x8-v0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import numpy as np | |
import matplotlib.pyplot as plt | |
def get_random_policy(): | |
""" | |
Build a numpy array representing agent policy. | |
This array must have one element per each of 16 environment states. | |
Element must be an integer from 0 to 3, representing action | |
to take from that state. | |
""" | |
return np.random.randint(0, n_actions, n_states) | |
def sample_reward(env, policy, t_max=100): | |
""" | |
Interact with an environment, return sum of all rewards. | |
If game doesn't end on t_max (e.g. agent walks into a wall), | |
force end the game and return whatever reward you got so far. | |
Tip: see signature of env.step(...) method above. | |
""" | |
s = env.reset() | |
total_reward = 0 | |
for _ in range(t_max): | |
s, reward, done, _ = env.step(policy[s]) | |
total_reward += reward | |
if done: | |
break | |
return total_reward | |
def evaluate(policy, t_max=100, n_times=100, reward=sample_reward): | |
"""Run several evaluations and average the score the policy gets.""" | |
rewards = [reward(env, policy, t_max) for _ in range(n_times)] | |
return float(np.mean(rewards)) | |
def mutation(policy, p=0.1): | |
""" | |
for each state, with probability p replace action with random action | |
Tip: mutation can be written as crossover with random policy | |
""" | |
return crossover(policy, get_random_policy(), p) | |
def crossover(policy1, policy2, p=0.5): | |
""" | |
for each state, with probability p take action from policy1, else policy2 | |
""" | |
randomness = np.random.choice([0, 1], n_states, [p, 1 - p]) | |
return np.where(randomness == 0, policy1, policy2) | |
n_epochs = 50 | |
pool_size = 100 | |
n_crossovers = 50 | |
n_mutations = 50 | |
t_max = 300 | |
n_times = 200 | |
pool = [get_random_policy() for _ in range(pool_size)] | |
pool_scores = list(map(evaluate, pool, np.full(len(pool), t_max, np.int), np.full(len(pool), n_times, np.int))) | |
def mutation_probability(policy_score, a=1, k=4): | |
return (a - policy_score) / k | |
scores = [] | |
for epoch in range(n_epochs): | |
norm = sum(pool_scores) if sum(pool_scores) > 0 else 1 | |
rands1 = np.random.choice(pool_size, n_crossovers, [e / norm for e in pool_scores]) | |
rands2 = np.random.choice(pool_size, n_crossovers, [e / norm for e in pool_scores]) | |
probs = [pool_scores[i] / (pool_scores[i] + pool_scores[j]) if pool_scores[i] + pool_scores[j] > 0 else 0.5 | |
for i, j in zip(rands1, rands2)] | |
crossovered = [crossover(pool[i], pool[j], p) | |
for i, j, p in zip(rands1, rands2, probs)] | |
rands = np.random.randint(0, pool_size, n_mutations) | |
mutated = [mutation(pool[i], mutation_probability(pool_scores[i])) for i in rands] | |
assert type(crossovered) == type(mutated) == list | |
#add new policies to the pool | |
pool = pool + crossovered + mutated | |
pool_scores = list(map(evaluate, pool, np.full(len(pool), t_max, np.int), np.full(len(pool), n_times, np.int))) | |
#select pool_size best policies | |
selected_indices = np.argsort(pool_scores)[-pool_size:] | |
pool = [pool[i] for i in selected_indices] | |
pool_scores = [pool_scores[i] for i in selected_indices] | |
scores.append(pool_scores[-1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment