Created
December 12, 2016 15:53
-
-
Save ryogrid/f344aa0909c5eda5ecbf95d3ccbd99f7 to your computer and use it in GitHub Desktop.
DQN algorithm for BipedalWalkerHardcore-v2 on Open AI Gym
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import numpy as np | |
import time | |
import chainer | |
from chainer import cuda, Function, gradient_check, Variable, optimizers, serializers, utils | |
from chainer import Link, Chain, ChainList | |
import chainer.functions as F | |
import chainer.links as L | |
import gym | |
np.random.seed(7) | |
STATE_NUM = 24 | |
# DQN内部で使われるニューラルネット | |
class Q(Chain): | |
def __init__(self,state_num=STATE_NUM): | |
super(Q,self).__init__( | |
l1=L.Linear(state_num, 16), # stateがインプット | |
l2=L.Linear(16, 32), | |
l3=L.Linear(32, 64), | |
l4=L.Linear(64, 256), | |
l5=L.Linear(256, 3*3*3*3), # 出力2チャネル(Qvalue)がアウトプット | |
) | |
def __call__(self,x,t): | |
return F.mean_squared_error(self.predict(x,train=True),t) | |
def predict(self,x,train=False): | |
h1 = F.leaky_relu(self.l1(x)) | |
h2 = F.leaky_relu(self.l2(h1)) | |
h3 = F.leaky_relu(self.l3(h2)) | |
h4 = F.leaky_relu(self.l4(h3)) | |
y = F.leaky_relu(self.l5(h4)) | |
return y | |
# DQNアルゴリズムにしたがって動作するエージェント | |
class DQNAgent(): | |
def __init__(self, epsilon=0.99): | |
self.model = Q() | |
self.optimizer = optimizers.Adam() | |
self.optimizer.setup(self.model) | |
self.epsilon = epsilon # ランダムアクションを選ぶ確率 | |
self.actions=[-1,0,1] # 行動の選択肢 | |
self.experienceMemory = [] # 経験メモリ | |
self.memSize = 300*100 # 経験メモリのサイズ(300サンプリングx100エピソード) | |
self.experienceMemory_local=[] # 経験メモリ(エピソードローカル) | |
self.memPos = 0 #メモリのインデックス | |
self.batch_num = 32 # 学習に使うバッチサイズ | |
self.gamma = 0.9 # 割引率 | |
self.loss=0 | |
self.total_reward_award=np.ones(100)*-1000 #100エピソード | |
def index_to_list(self, index): | |
ret_arr = [] | |
a = int(index / 27) - 1 | |
rest = index - 27*int(index / 27) | |
ret_arr.append(a) | |
a = int(rest / 9) - 1 | |
rest = rest - 9*int(rest / 9) | |
ret_arr.append(a) | |
a = int(rest / 3) - 1 | |
rest = rest - 3*int(rest / 3) | |
ret_arr.append(a) | |
ret_arr.append(rest -1) | |
return ret_arr | |
def list_to_index(self, lst): | |
ret = 0 | |
ret += (lst[0] + 1)*27 | |
ret += (lst[1] + 1)*9 | |
ret += (lst[2] + 1)*3 | |
ret += (lst[3] + 1) | |
return ret | |
def get_action_value(self, seq): | |
# seq後の行動価値を返す | |
x = Variable(np.hstack([seq]).astype(np.float32).reshape((1,-1))) | |
return self.model.predict(x).data[0] | |
def get_greedy_action(self, seq): | |
action_index = np.argmax(self.get_action_value(seq)) | |
return self.index_to_list(action_index) | |
def reduce_epsilon(self): | |
self.epsilon-=1.0/1000000 | |
def get_epsilon(self): | |
return self.epsilon | |
def get_action(self,seq,train): | |
''' | |
seq (theta, old_theta)に対して | |
アクション(左に動くか右に動くか)を返す。 | |
''' | |
action=[] | |
if train==True and np.random.random()<self.epsilon: | |
# random | |
action.append(np.random.choice(self.actions)) | |
action.append(np.random.choice(self.actions)) | |
action.append(np.random.choice(self.actions)) | |
action.append(np.random.choice(self.actions)) | |
else: | |
# greedy | |
action= self.get_greedy_action(seq) | |
return action | |
def experience_local(self,old_seq, action, reward, new_seq): | |
#エピソードローカルな記憶 | |
self.experienceMemory_local.append( np.hstack([old_seq,action,reward,new_seq]) ) | |
def experience_global(self,total_reward): | |
#グローバルな記憶 | |
#ベスト100に入る経験を取り込む | |
if np.min(self.total_reward_award)<total_reward: | |
i=np.argmin(self.total_reward_award) | |
self.total_reward_award[i]=total_reward | |
# GOOD EXPERIENCE REPLAY | |
for x in self.experienceMemory_local: | |
self.experience( x ) | |
#一定確率で優秀でないものも取り込む | |
if np.random.random()<0.01: | |
# # NORMAL EXPERIENCE REPLAY | |
for x in self.experienceMemory_local: | |
self.experience( x ) | |
self.experienceMemory_local=[] | |
def experience(self,x): | |
if len(self.experienceMemory)>self.memSize: | |
self.experienceMemory[int(self.memPos%self.memSize)]=x | |
self.memPos+=1 | |
else: | |
self.experienceMemory.append( x ) | |
def update_model(self,old_seq, action, reward, new_seq): | |
''' | |
モデルを更新する | |
''' | |
# 経験メモリにたまってない場合は更新しない | |
if len(self.experienceMemory)<self.batch_num: | |
return | |
# 経験メモリからバッチを作成 | |
memsize=len(self.experienceMemory) | |
batch_index = list(np.random.randint(0,memsize,(self.batch_num))) | |
batch =np.array( [self.experienceMemory[i] for i in batch_index ]) | |
x = Variable(batch[:,0:STATE_NUM].reshape( (self.batch_num,-1)).astype(np.float32)) | |
targets=self.model.predict(x).data.copy() | |
for i in range(self.batch_num): | |
#[ seq..., action, reward, seq_new] | |
a = batch[i,STATE_NUM] | |
r = batch[i, STATE_NUM+1] | |
ai=a | |
new_seq= batch[i,(STATE_NUM+2):(STATE_NUM*2+2)] | |
targets[i,ai]=( r+ self.gamma * np.max(self.get_action_value(new_seq))) | |
t = Variable(np.array(targets).reshape((self.batch_num,-1)).astype(np.float32)) | |
# ネットの更新 | |
self.model.zerograds() | |
loss=self.model(x ,t) | |
self.loss = loss.data | |
loss.backward() | |
self.optimizer.update() | |
class walkerEnvironment(): | |
def __init__(self): | |
self.env = gym.make('BipedalWalkerHardcore-v2') | |
self.env.monitor.start('./walker-experiment') | |
def reset(self): | |
self.env.reset() | |
def step(self, action): | |
return self.env.step(action) | |
def monitor_close(self): | |
self.env.monitor.close() | |
# 環境とエージェントを渡すとシミュレーションするシミュレータ。 | |
# ここにシーケンスを持たせるのはなんか変な気もするけどまあいいか。。 | |
class simulator: | |
def __init__(self, environment, agent): | |
self.agent = agent | |
self.env = environment | |
self.num_seq=STATE_NUM | |
self.reset_seq() | |
self.learning_rate=1.0 | |
self.highscore=0 | |
self.log=[] | |
def reset_seq(self): | |
self.seq=np.zeros(self.num_seq) | |
def push_seq(self, state): | |
self.seq = state | |
def run(self, train=True): | |
self.env.reset() | |
self.reset_seq() | |
total_reward=0 | |
for i in range(100000): | |
# 現在のstateからなるシーケンスを保存 | |
old_seq = self.seq.copy() | |
# エージェントの行動を決める | |
action = self.agent.get_action(old_seq,train) | |
# 環境に行動を入力する | |
observation, reward, done, info = self.env.step(action) | |
total_reward +=reward | |
# 結果を観測してstateとシーケンスを更新する | |
state = observation | |
self.push_seq(state) | |
new_seq = self.seq.copy() | |
# エピソードローカルなメモリに記憶する | |
action_idx = self.agent.list_to_index(action) | |
self.agent.experience_local(old_seq, action_idx, reward, new_seq) | |
if done: | |
print("Episode finished after {} timesteps".format(i+1)) | |
break | |
# エピソードローカルなメモリ内容をグローバルなメモリに移す | |
self.agent.experience_global(total_reward) | |
if train: | |
# 学習用メモリを使ってモデルを更新する | |
action_idx = self.agent.list_to_index(action) | |
self.agent.update_model(old_seq, action_idx, reward, new_seq) | |
self.agent.reduce_epsilon() | |
return total_reward | |
if __name__ == '__main__': | |
agent=DQNAgent() | |
env=walkerEnvironment() | |
sim=simulator(env,agent) | |
best_reword = -200 | |
for i in range(1000000): | |
total_reword = sim.run(train=True) | |
if best_reword < total_reword: | |
best_reword = total_reword | |
print(str(i) + " " + str(total_reword) + " " + str(best_reword)) | |
env.reset() | |
if best_reword > 195: | |
break | |
env.monitor_close() |
Hi,
You have a bug in your predict function.
y = F.leaky_relu(self.l5(h4))
should be:
y = self.l5(h4)
You shouldn't put a non linearity after your final layer
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://twitter.com/ryo_grid/status/808347392521052160