Created
July 22, 2020 21:27
-
-
Save DataWraith/33abec4777a672827be29a511332340c to your computer and use it in GitHub Desktop.
Solve LunarLander-v2 using CMA-ES
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
ll-cma.py: Solve LunarLander-v2 using CMA-ES. | |
This code solves the LunarLander-v2 environment from the OpenAI gym using CMA-ES. | |
You need to pip-install the packages numpy, gym[box2d] and cma in order for it to work. | |
It doesn't seem to solve the environment in every run, but if it does, it | |
generally takes less than 15 minutes to do so. | |
Copyright 2020, Johannes Holzfuß | |
Copying and distribution of this file, with or without modification, are | |
permitted in any medium without royalty, provided the copyright notice and this | |
notice are preserved. This file is offered as-is, without any warranty. | |
(The GNU All-permissive License) | |
""" | |
from collections import deque | |
import cma | |
import gym | |
import numpy as np | |
import time | |
env = gym.make("LunarLander-v2") | |
es = cma.CMAEvolutionStrategy(32 * [0.0], 0.5) | |
class Normalizer: | |
""" | |
Normalizer standardizes the inputs to have approximately zero mean and unit variance. | |
See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance on Welford's online algorithm. | |
""" | |
def __init__(self, nb_inputs): | |
self.n = np.zeros(nb_inputs) | |
self.mean = np.zeros(nb_inputs) | |
self.mean_diff = np.zeros(nb_inputs) | |
self.var = np.zeros(nb_inputs) | |
def observe(self, x): | |
self.n += 1.0 | |
last_mean = self.mean.copy() | |
self.mean += (x - self.mean) / self.n | |
self.mean_diff += (x - last_mean) * (x - self.mean) | |
self.var = (self.mean_diff / self.n).clip(min=1e-2) | |
def normalize(self, inputs): | |
obs_mean = self.mean | |
obs_std = np.sqrt(self.var) | |
return (inputs - obs_mean) / obs_std | |
normalizer = Normalizer(8) | |
def choose_action(obs, theta): | |
global normalizer | |
# Normalize observation | |
normalizer.observe(obs) | |
obs = normalizer.normalize(obs) | |
# Linear layer (without bias) | |
x = np.dot(obs, np.reshape(theta, (8, 4))) | |
# Argmax with random tie-breaking | |
a = np.random.choice(np.flatnonzero(x == x.max())) | |
return a | |
def run_episode(env, theta): | |
obs = env.reset() | |
reward_sum = 0 | |
# Episodes are limited to 1000 frames | |
for t in range(1000): | |
a = choose_action(obs, theta) | |
obs, reward, done, _ = env.step(a) | |
reward_sum += reward | |
if done: | |
break | |
return reward_sum | |
ep = 1 | |
gen = 1 | |
trend = deque([], 100) | |
while True: | |
solutions = es.ask() | |
fitnesses = [] | |
print("*** GENERATION", gen, "***") | |
for s in solutions: | |
r = run_episode(env, s) | |
# Negate r because CMA-ES minimizes | |
fitnesses.append(-r) | |
trend.append(r) | |
ep += 1 | |
print(ep, np.mean(trend)) | |
es.tell(solutions, fitnesses) | |
es.logger.add() | |
es.disp() | |
gen += 1 | |
if np.mean(trend) > 200: | |
break | |
print(es.result_pretty()) | |
es.logger.plot() | |
# Victory dance (display what it learned) | |
while True: | |
obs = env.reset() | |
reward_sum = 0 | |
for t in range(1000): | |
a = choose_action(obs, es.mean) | |
obs, reward, done, _ = env.step(a) | |
env.render() | |
time.sleep(1.0 / 100) | |
reward_sum += reward | |
if done: | |
break | |
print(reward_sum) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment