Created
October 12, 2015 13:08
-
-
Save peace098beat/ac0917548e833babcd07 to your computer and use it in GitHub Desktop.
[QLearn4] QLearningクラスの実装 | その1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding: utf-8 | |
| """ | |
| QLearn4.py | |
| QLearn3.pyの改良版。 | |
| AgentクラスとStateクラスとQLearnクラスを実装 | |
| 参考 | |
| Q学習-最良経路を学習するスクリプト書いた (powered by Python) | |
| http://d.hatena.ne.jp/Kshi_Kshi/20111227/1324993576 | |
| Greedy法:学習結果をGreedy法で行動選択 | |
| """ | |
| import sys | |
| import copy | |
| import random | |
| import numpy as np | |
| # 環境: State | |
| # 環境の状態: S | |
| # エージェントの行動:a | |
| # 方策: Plan -> a = Plan(S) | |
| # S' = State(S,a) | |
| # 報酬: r = Reward(S') | |
| GAMMA = 0.9 | |
| ALPHA = 0.4 | |
| GREEDY_RATIO = 0.5 | |
| MAX_ITERATE = 150000 | |
| NUM_COL = 10 # 横 | |
| NUM_ROW = 10 # 縦 | |
| GOAL_REWORD = 10 | |
| ACTION = [0, 1, 2, 3] | |
| ACTION_NAME = ['UP', 'RIGHT', 'DOWN', 'LEFT'] | |
| NUM_ACTION = 4 | |
| START_COL = 1 | |
| START_ROW = 1 | |
| GOAL_COL = NUM_COL - 2 | |
| GOAL_ROW = NUM_ROW - 2 | |
| # 道:0 壁:1 ゴール:2 エージェント:3 | |
| # WALL = 1 | |
| # GOAL = 2 | |
| # ROAD = 0 | |
| # AGENT = 3 | |
| ROAD, WALL, GOAL, AGENT = 0, 1, 2, 3 | |
| # FIELD = np.array([ | |
| # [1, 1, 1, 1, 1, 1], | |
| # [1, 0, 3, 0, 0, 1], | |
| # [1, 0, 0, 0, 0, 1], | |
| # [1, 0, 0, 0, 0, 1], | |
| # [1, 0, 0, 0, 0, 1], | |
| # [1, 1, 1, 1, 1, 1] | |
| # ]) | |
| # フィールドを生成 | |
| FIELD = np.zeros((NUM_ROW, NUM_COL)) | |
| for row in range(NUM_ROW): | |
| for col in range(NUM_COL): | |
| if row in (0, NUM_ROW - 1): | |
| FIELD[row, col] = WALL | |
| if col in (0, NUM_COL - 1): | |
| FIELD[row, col] = WALL | |
| FIELD[START_ROW, START_COL] = AGENT | |
| def fieldDisplay(S): | |
| print '*** Field ***' | |
| Sprint = np.copy(S) | |
| Sprint[GOAL_ROW, GOAL_COL] = GOAL | |
| for row in range(NUM_ROW): | |
| print Sprint[row, :] | |
| class QLearning(object): | |
| """ | |
| Agentがもつ学習クラス。 | |
| a = getAction(S) | |
| lean(S,a,r,S_next) | |
| """ | |
| def __init__(self): | |
| # テーブル関数を準備 | |
| self.Q = np.zeros((NUM_ROW, NUM_COL, NUM_ACTION)) | |
| pass | |
| def getQ(self): | |
| return self.Q | |
| def qlearn(self, S, a, r, S_next): | |
| row, col = np.where(S == AGENT) | |
| row = row[0] | |
| col = col[0] | |
| row_next, col_next = np.where(S_next == AGENT) | |
| row_next = row_next[0] | |
| col_next = col_next[0] | |
| # q_value = Q[row, col, a] | |
| # q = (1.0 - ALPHA) * q_value + ALPHA * (r + GAMMA * max(Q[row_next, col_next, :])) | |
| # Q[row, col, a] = q | |
| max_Q = max(self.Q[row_next, col_next, :]) | |
| q = self.Q[row, col, a] + ALPHA * (r + GAMMA * max_Q - self.Q[row, col, a]) | |
| self.Q[row, col, a] = q | |
| # return Q | |
| def displayQ(self): | |
| Q = self.Q | |
| for row in range(NUM_ROW): | |
| c = [max(Q[row, col, :]) for col in range(NUM_COL)] | |
| # print '%5.1f,'*6 % tuple(c) | |
| for a in ACTION: | |
| print 'action', a | |
| for row in range(NUM_ROW): | |
| c = [Q[row, col, a] for col in range(NUM_COL)] | |
| print '%3.1f,' * NUM_COL % tuple(c) | |
| # Agentクラス | |
| class Agent(object): | |
| """ ゲームルールによらない汎用性を持たす | |
| action: パターンの数だけ保持 | |
| 学習アルゴリズム: Q学習 | |
| a = getNextAction(s) | |
| lean(S,a,r,S_next) | |
| """ | |
| def __init__(self, numAction=4): | |
| self.action_paturn = range(numAction) | |
| self.qlearnobj = QLearning() | |
| pass | |
| # def getNextAction(self, state): | |
| """ | |
| 引数stateで与えられた状態をもとに、 | |
| 次の最善手の学習結果を返す. | |
| :Q学習の場合, Q値が最大の行動を返す | |
| :param state: ゲームから得られる状態 | |
| :return: next_action | |
| """ | |
| # デバック用に適当に返す | |
| # return np.random.choice(self.action_paturn) | |
| def displayQ(self): | |
| self.qlearnobj.displayQ() | |
| def learn(self, S, a, r, S_next): | |
| # X = [S, a, r, S_next] | |
| """Q学習 or NeuralNetworkを使って,Q値を学習""" | |
| self.qlearnobj.qlearn(S, a, r, S_next) | |
| pass | |
| def getNextAction(self, S): | |
| Q = self.qlearnobj.getQ() | |
| Agent_row, Agent_col = np.where(S == AGENT) | |
| Agent_row = Agent_row[0] | |
| Agent_col = Agent_col[0] | |
| # print 'Q:', Q[Agent_row, Agent_col, :] | |
| a = 1 | |
| max_Q = -10000 | |
| best_action = [] | |
| for i in range(NUM_ACTION): | |
| q = Q[Agent_row, Agent_col, ACTION[i]] | |
| if q > max_Q: | |
| max_Q = q | |
| best_action = [ACTION[i]] | |
| elif q == max_Q: | |
| best_action.append(ACTION[i]) | |
| # print '>> Best Action,', best_action | |
| a = np.random.choice(best_action) | |
| if GREEDY_RATIO < random.random(): | |
| return a | |
| else: | |
| return np.random.choice([0, 1, 2, 3]) | |
| class State(object): | |
| """ゲームの中身 | |
| S_NEXT, R = goNextStep(S,a) | |
| """ | |
| def __init__(self): | |
| self.initS = FIELD | |
| def __getReward(self, Snext): | |
| # 2. 環境StateはエージェントAgentから受け取った行動aと、現在の状態Sにもとづいて、次の状態S'に変化 | |
| Agent_row, Agent_col = np.where(Snext == AGENT) | |
| row1 = Agent_row[0] | |
| col1 = Agent_col[0] | |
| # if (row1, col1) == (GOAL_ROW, GOAL_COL): | |
| if row1 == GOAL_ROW and col1 == GOAL_COL: | |
| r = GOAL_REWORD | |
| else: | |
| r = 0 | |
| return r | |
| def getNextState(self, Snow, a): | |
| """ | |
| Agentの行動と環境から、次の状態と行動を返す | |
| :param S: | |
| :param a: | |
| :return: S_next, reward, option | |
| """ | |
| # 2. 環境StateはエージェントAgentから受け取った行動aと、現在の状態Sにもとづいて、次の状態S'に変化 | |
| Agent_row, Agent_col = np.where(Snow == 3) | |
| row1 = Agent_row[0] | |
| col1 = Agent_col[0] | |
| if a == ACTION[0]: | |
| row2 = row1 - 1 | |
| col2 = col1 | |
| elif a == ACTION[1]: | |
| row2 = row1 | |
| col2 = col1 + 1 | |
| elif a == ACTION[2]: | |
| row2 = row1 + 1 | |
| col2 = col1 | |
| elif a == ACTION[3]: | |
| row2 = row1 | |
| col2 = col1 - 1 | |
| Stmp = Snow[row2, col2] | |
| # 更新 | |
| if Stmp == WALL: | |
| # 壁判定 | |
| # print 'Agent next WALL' | |
| S_next = np.copy(Snow) | |
| else: | |
| Snow[row1, col1] = ROAD | |
| Snow[row2, col2] = AGENT | |
| S_next = Snow | |
| # option = ROAD | |
| if row2 == GOAL_ROW and col2 == GOAL_COL: | |
| option = GOAL | |
| else: | |
| option = ROAD | |
| return S_next, self.__getReward(S_next), option | |
| def getInitState(self): | |
| return FIELD | |
| pass | |
| def main(): | |
| # ********* main ********* # | |
| # 0. 初期状態 | |
| # AgentのAI | |
| # Q = np.zeros((NUM_ROW, NUM_COL, NUM_ACTION)) | |
| # goal_num = 0 | |
| t = 0 | |
| # S = np.copy(FIELD) | |
| # fieldDisplay(S) | |
| agentObj = Agent() | |
| # stateObj = State() | |
| a = np.random.choice(range(3)) | |
| return | |
| while t < 0: | |
| pass | |
| # print '//////////// Itarate %d ///////////////' % (t) | |
| # print 'Goal Number : %d' % (goal_num) | |
| # fieldDisplay(S) | |
| # 1. エージェントは環境から受け取った観測Sを受け取り、方策planに基づいて環境に行動aを渡す | |
| # a = plan(S) | |
| # a = planQ(Q, S) | |
| # a = Agent.getNextAction(S) | |
| # print '>> Action :', ACTION_NAME[a] | |
| # 2. 環境StateはエージェントAgentから受け取った行動aと、現在の状態Sにもとづいて、次の状態S'に変化 | |
| # S_next = State(S, a) | |
| # その遷移に基づいて報酬r = Reward(S')をエージェントに返す | |
| # r = Reward(S_next) | |
| # print '>> Reward Value :', r | |
| # S_next, r, option = State.goNextStep(S,a) | |
| # 4. Agentに学習させる | |
| # Q = QLearn(np.copy(Q), np.copy(S), a, r, np.copy(S_next)) | |
| # Agent.lean(S,a,r,S_next) | |
| # if r == GOAL_REWORD: | |
| # fieldDisplay(S) | |
| # S = np.copy(FIELD) | |
| # S = State.resetState() | |
| # goal_num += 1 | |
| # S = np.copy(S_next) | |
| # 3. 時間進行t=t+1 | |
| # t = t + 1 | |
| # print '//////////// Itarate %d ///////////////' % (t) | |
| # print 'Goal Number : %d' % (goal_num) | |
| # displayQ(Q) | |
| def main2(): | |
| """ | |
| Q値が伝播しないので、 | |
| 配列を渡すときにコピーする。 | |
| 解決: learnに渡す配列をコピーした | |
| """ | |
| agent = Agent() | |
| state = State() | |
| # ログ用の定数 | |
| goaled_number = 0 | |
| # 初期設定 | |
| S = state.getInitState() | |
| print '>> State : Init State' | |
| print S | |
| for i in range(MAX_ITERATE): | |
| if i % (MAX_ITERATE/20) == 0: | |
| print i | |
| # agent.displayQ() | |
| # 1. エージェントは環境から受け取った観測Sを受け取り、方策planに基づいて環境に行動aを渡す | |
| # a = agent.getNextAction(S) | |
| a = agent.getNextAction(np.copy(S)) | |
| # print '>>Agent Next Action is :%d' % a | |
| # 2. 環境StateはエージェントAgentから受け取った行動aと、 | |
| # 現在の状態Sにもとづいて、次の状態S'を返却 | |
| # S_next, r, option = state.getNextState(S, a) | |
| S_next, r, option = state.getNextState(np.copy(S), a) | |
| # print '>>State Next step:' | |
| # print S_next | |
| # print '>>> Reward ', r | |
| # print '>>> Option ', option | |
| # 3. Agentに学習させる | |
| agent.learn(S, a, r, S_next) | |
| agent.learn(np.copy(S), a, r, np.copy(S_next)) | |
| # 4. Stateの初期化判定 | |
| if option == GOAL: | |
| goaled_number += 1 | |
| # print S_next | |
| S = state.getInitState() | |
| else: | |
| S = S_next | |
| print '>> GOAL NUMBER :', goaled_number | |
| print agent.displayQ() | |
| if __name__ == '__main__': | |
| main2() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment