Last active
August 23, 2016 12:09
-
-
Save devforfu/f688b76c51ce1e2457c3e205165558b0 to your computer and use it in GitHub Desktop.
Cart Pole balancing using randomized strategy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Cart pole environment learner that uses something like Cross Entropy Method. | |
"Something like" b/c not sure if implemented it correctly. And it is EXTREMELY slow. Anyway, | |
looks like some kind of randomized search. | |
""" | |
from operator import itemgetter | |
import heapq | |
import numpy as np | |
import gym | |
class QuasiCrossEntropyLearner: | |
def __init__(self, env, elite_size=20, **config): | |
self.env = env | |
self.elite_size = elite_size | |
space_size = env.observation_space.shape[0] | |
shape = space_size, space_size | |
self.means = np.zeros(space_size) | |
self.cov_mat = np.diag(np.diag(np.ones(shape))) | |
def samples_generator(self, n): | |
for i in range(n): | |
yield np.random.multivariate_normal(self.means, self.cov_mat) | |
def learn(self, **control): | |
def policy(state, weights): | |
response = np.dot(state, weights) | |
action = int(response >= 0) | |
return action | |
def evaluate(weights, max_step=1000): | |
curr_s = env.reset() | |
curr_a = policy(curr_s, weights) | |
total_reward = 0 | |
for step in range(max_step): | |
env.render() | |
next_s, reward, done, _ = env.step(curr_a) | |
total_reward += reward | |
next_a = policy(next_s, weights) | |
curr_s, curr_a = next_s, next_a | |
if done: | |
break | |
return weights, total_reward | |
def elite(population, size=20): | |
best_pairs = heapq.nlargest(size, population, key=itemgetter(1)) | |
best_weights = [w for w, r in best_pairs] | |
reward = np.mean([r for w, r in best_pairs]) | |
return best_weights, reward | |
env = self.env | |
n_sim = control.get("n_sim", 10) | |
n_samples = control.get("n_samples", 100) | |
cumu_avg_reward = None | |
for i in range(n_sim): | |
total_rewards = map(evaluate, self.samples_generator(n_samples)) | |
best, avg_reward = elite(total_rewards) | |
self.means = np.mean(best, axis=0) | |
self.cov_mat = np.cov(best, rowvar=0) | |
if cumu_avg_reward is None: | |
cumu_avg_reward = avg_reward | |
else: | |
cumu_avg_reward = 0.5*(avg_reward + cumu_avg_reward) | |
print("Average reward for " | |
"iteration {} is {}".format(i, cumu_avg_reward)) | |
def main(): | |
env = gym.make('CartPole-v0') | |
learner = QuasiCrossEntropyLearner(env) | |
learner.learn() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment