Skip to content

Instantly share code, notes, and snippets.

@tsu-nera
Last active June 9, 2017 18:48
Show Gist options
  • Save tsu-nera/c7bc7e5afb4b0a7c80bee8c267f2b8cd to your computer and use it in GitHub Desktop.
Save tsu-nera/c7bc7e5afb4b0a7c80bee8c267f2b8cd to your computer and use it in GitHub Desktop.
import gym
import numpy as np
from gym import wrappers
env = gym.make("FrozenLake-v0")
env = wrappers.Monitor(env, '/tmp/cartpole-experiment-3')
env.reset();
n_states = env.observation_space.n
n_actions = env.action_space.n
def get_random_policy():
return np.random.choice(n_actions, size=n_states)
def sample_reward(env, policy, t_max=100):
s = env.reset()
total_reward = 0
for _ in range(t_max):
s, reward, is_done, _ = env.step(policy[s])
total_reward += reward
if is_done:
break
return total_reward
def evaluate(policy, n_times=100):
rewards = [sample_reward(env, policy) for _ in range(n_times)]
return float(np.mean(rewards))
def print_policy(policy):
lake = "SFFFFHFHFFFHHFFG"
arrows = ['>^v<'[a] for a in policy]
signs = [arrow if tile in "SF" else tile for arrow, tile in zip(arrows, lake)]
for i in range(0, 16, 4):
print(' '.join(signs[i:i+4]))
def crossover(policy1, policy2, p=0.5):
idx = np.random.rand(n_states) < p
return np.choose(idx, [policy1, policy2])
def mutation(policy, p=0.1):
return crossover(get_random_policy(), policy, p)
n_epochs = 100 #how many cycles to make
pool_size = 100 #how many policies to maintain
n_crossovers = 50 #how many crossovers to make on each step
n_mutations = 50 #how many mutations to make on each tick
print("initializing...")
pool = [get_random_policy() for _ in range(pool_size)]
pool_scores = [evaluate(policy) for policy in pool]
for epoch in range(n_epochs):
print("Epoch %s:"%epoch)
crossovered = [crossover(pool[p1], pool[p2])
for p1, p2 in zip(np.random.choice(len(pool), size=n_crossovers),
np.random.choice(len(pool), size=n_crossovers))]
mutated = [mutation(pool[p]) for p in np.random.choice(len(pool), size=n_mutations)]
#add new policies to the pool
pool.extend(crossovered)
pool.extend(mutated)
pool_scores = [evaluate(p) for p in pool]
#select pool_size best policies
selected_indices = np.argsort(pool_scores)[-pool_size:]
pool = [pool[i] for i in selected_indices]
pool_scores = [pool_scores[i] for i in selected_indices]
#print the best policy so far (last in ascending score order)
print("best score:", pool_scores[-1])
print_policy(pool[-1])
env.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment