Skip to content

Instantly share code, notes, and snippets.

@jia-kai
Created August 28, 2016 15:00
Show Gist options
  • Save jia-kai/a226b6c05fb19c71663b4de261f41c83 to your computer and use it in GitHub Desktop.
Save jia-kai/a226b6c05fb19c71663b4de261f41c83 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# $File: cart.py
# $Date: Sun Aug 28 23:00:26 2016 +0800
# $Author: jiakai <[email protected]>
import gym
import numpy as np
import operator
import itertools
class CrossEntropySolver:
pool_size = 10
selection_ratio = 0.3
w_size = 4
max_steps_per_sample = 3000
_env = None
_episode = 0
_cur_mean = None
_cur_std = None
_best_core = -1
best_w = None
def __init__(self, env):
self._env = env
self._cur_mean = np.random.normal(size=self.w_size)
self._cur_std = np.ones(self.w_size)
def get_action(self, obsrv, w):
return int(np.dot(obsrv, w) > 0)
def eval(self, w, verbose=False):
env = self._env
obsrv = env.reset()
tot_reward = 0
if verbose:
rng = itertools.count()
else:
rng = range(self.max_steps_per_sample)
for i in rng:
if verbose:
print(i, end='\r', flush=True)
env.render()
action = self.get_action(obsrv, w)
obsrv, reward, done, info = env.step(action)
tot_reward += reward
if done:
break
tot_reward = float(tot_reward)
if tot_reward > self._best_core:
self._best_core = tot_reward
self.best_w = w
return tot_reward
def sample(self):
return (np.random.normal(size=self.w_size) * self._cur_std +
self._cur_mean)
def train_episode(self):
env = self._env
cand = []
for i in range(self.pool_size):
w = self.sample()
cand.append((np.mean(list(map(self.eval, [w]*3))), w))
print('{}:{} ...'.format(self._episode, i), end='\r', flush=True)
nr_sel = int(self.selection_ratio * len(cand))
cand = sorted(cand, key=operator.itemgetter(0))[-nr_sel:]
ws = np.array([i[1] for i in cand])
self._cur_mean = np.mean(ws, axis=0)
self._cur_std = np.std(ws, axis=0, ddof=1) + 1.5 / (self._episode + 1)
avg_score = float(np.mean([i[0] for i in cand]))
print('episode {}: score={}'.format(self._episode, avg_score))
self._episode += 1
return avg_score
def main():
monitor = True
env = gym.make('CartPole-v0')
if monitor:
env.monitor.start(
'expr0',
video_callable=lambda x: x%(CrossEntropySolver.pool_size*5) == 0)
ce = CrossEntropySolver(env)
for i in range(20):
if ce.train_episode() == ce.max_steps_per_sample:
break
if monitor:
env.monitor.close()
print('eval:', ce.eval(ce.best_w, True))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment