Skip to content

Instantly share code, notes, and snippets.

@DataWraith
Created July 22, 2020 21:27
Show Gist options
  • Save DataWraith/33abec4777a672827be29a511332340c to your computer and use it in GitHub Desktop.
Save DataWraith/33abec4777a672827be29a511332340c to your computer and use it in GitHub Desktop.
Solve LunarLander-v2 using CMA-ES
#!/usr/bin/env python3
"""
ll-cma.py: Solve LunarLander-v2 using CMA-ES.
This code solves the LunarLander-v2 environment from the OpenAI gym using CMA-ES.
You need to pip-install the packages numpy, gym[box2d] and cma in order for it to work.
It doesn't seem to solve the environment in every run, but if it does, it
generally takes less than 15 minutes to do so.
Copyright 2020, Johannes Holzfuß
Copying and distribution of this file, with or without modification, are
permitted in any medium without royalty, provided the copyright notice and this
notice are preserved. This file is offered as-is, without any warranty.
(The GNU All-permissive License)
"""
from collections import deque
import cma
import gym
import numpy as np
import time
env = gym.make("LunarLander-v2")
es = cma.CMAEvolutionStrategy(32 * [0.0], 0.5)
class Normalizer:
"""
Normalizer standardizes the inputs to have approximately zero mean and unit variance.
See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance on Welford's online algorithm.
"""
def __init__(self, nb_inputs):
self.n = np.zeros(nb_inputs)
self.mean = np.zeros(nb_inputs)
self.mean_diff = np.zeros(nb_inputs)
self.var = np.zeros(nb_inputs)
def observe(self, x):
self.n += 1.0
last_mean = self.mean.copy()
self.mean += (x - self.mean) / self.n
self.mean_diff += (x - last_mean) * (x - self.mean)
self.var = (self.mean_diff / self.n).clip(min=1e-2)
def normalize(self, inputs):
obs_mean = self.mean
obs_std = np.sqrt(self.var)
return (inputs - obs_mean) / obs_std
normalizer = Normalizer(8)
def choose_action(obs, theta):
global normalizer
# Normalize observation
normalizer.observe(obs)
obs = normalizer.normalize(obs)
# Linear layer (without bias)
x = np.dot(obs, np.reshape(theta, (8, 4)))
# Argmax with random tie-breaking
a = np.random.choice(np.flatnonzero(x == x.max()))
return a
def run_episode(env, theta):
obs = env.reset()
reward_sum = 0
# Episodes are limited to 1000 frames
for t in range(1000):
a = choose_action(obs, theta)
obs, reward, done, _ = env.step(a)
reward_sum += reward
if done:
break
return reward_sum
ep = 1
gen = 1
trend = deque([], 100)
while True:
solutions = es.ask()
fitnesses = []
print("*** GENERATION", gen, "***")
for s in solutions:
r = run_episode(env, s)
# Negate r because CMA-ES minimizes
fitnesses.append(-r)
trend.append(r)
ep += 1
print(ep, np.mean(trend))
es.tell(solutions, fitnesses)
es.logger.add()
es.disp()
gen += 1
if np.mean(trend) > 200:
break
print(es.result_pretty())
es.logger.plot()
# Victory dance (display what it learned)
while True:
obs = env.reset()
reward_sum = 0
for t in range(1000):
a = choose_action(obs, es.mean)
obs, reward, done, _ = env.step(a)
env.render()
time.sleep(1.0 / 100)
reward_sum += reward
if done:
break
print(reward_sum)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment