Created
November 23, 2015 22:18
-
-
Save peace098beat/2647bb79b0f52fa108fe to your computer and use it in GitHub Desktop.
[機械学習] DQN 車のレース. GUI付き。 ver0.1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! coding:utf-8 | |
| """ | |
| circle-race-dqn.py | |
| 箱の中で壁にぶつからないように動くAI | |
| ============================== | |
| = CircleRaceWindow(QMainWindow) | |
| ============================== | |
| + state<-CircleRace | |
| + agent<-QNeuralNetwork | |
| + iterate_step = {0} | |
| ----------------------------- | |
| : mainloop() | |
| - {s0} = state.get_init_state() | |
| - {a} = agent.get_next_action(s0) | |
| - {s1,R} = state.get_next_state(a) | |
| - {} = agent.learn(s0,a,R,s1} | |
| - iterate_step += 1 | |
| : paintEvent() | |
| - _drawField() | |
| [CircleRaceに特化した処理] | |
| - _drawCar() | |
| [CircleRaceに特化した処理] | |
| ----------------------------- | |
| : _drawField() | |
| - {paint field} | |
| : _drawCar | |
| - car_status = state.car.get_status | |
| - {paint car status} | |
| ----------------------------- | |
| ================================ | |
| = CircleRace{State} | |
| ================================ | |
| + field <- Field() | |
| + car <- Car() | |
| ----------------------------- | |
| : get_init_state() | |
| [ +Car()の初期位置・状態を返却] | |
| : get_next_state(action) | |
| [ +Car()の位置・状態を更新] | |
| ----------------------------- | |
| memo: | |
| CircleRaceクラスはFieldクラスとCarクラスを保持している。 | |
| 主な役割は, CarとFieldの相対位置関係からセンサの値を | |
| 計算し, Carクラスへ渡す役割を持つ。 | |
| ex. 1. Carの位置を取得 2. Car位置の更新 | |
| 3. 衝突判定 4. Car位置の修正 5. センサ値の算出 | |
| 汎用性: 地形の変化はFiledクラスが持つ。 | |
| CircleRaceクラスは, Carの位置情報をFieldクラスへ渡し | |
| 衝突判定結果をもとに, センサ位置を更新するのみである。 | |
| ※ 複雑地形の衝突判定はFieldクラスが持つ | |
| ================================ | |
| = QNeuralNetwork{Agent} | |
| ================================ | |
| + NN <- NewralNetwork() | |
| ----------------------------- | |
| : {s} = get_next_action(state=s) | |
| : {error} = lean(s0,a,R,s1) | |
| ----------------------------- | |
| memo: | |
| QNeuralNetworkクラスは, 入力(センサ)データから, Q値を予測し | |
| 次の行動を算出するクラスである. | |
| ================================ | |
| = Field | |
| ================================ | |
| + width, height | |
| + geometory | |
| ----------------------------- | |
| : isCrush(pos=[x,y]) | |
| ----------------------------- | |
| Fieldクラスは, 地形情報を持つ.主の役割は与えられた位置情報から | |
| 衝突判定を行うのみである. | |
| (2015/11/24) 初期コードを生成 | |
| Created by fifi (2015/11/21 9:08) | |
| """ | |
| __version__ = '0.1' | |
| # モジュールのインポート | |
| import os | |
| import sys | |
| from datetime import datetime | |
| import csv | |
| from PySide.QtGui import * | |
| from PySide.QtCore import * | |
| import numpy as np | |
| from MultiLayerPerceptron import MultiLayerPerceptron | |
| def resetlog(): | |
| """resetlog() | |
| ログファイルを削除 | |
| """ | |
| logfile = '.log.csv' | |
| if os.path.exists(logfile): | |
| os.remove('.log.csv') | |
| resetlog() | |
| def debuglog(s=None, data=None): | |
| """debuglog() | |
| ログファイルの保存 | |
| """ | |
| d = datetime.now().isoformat() | |
| with open('.log.csv', 'a') as f: | |
| writer = csv.writer(f, lineterminator='\n') | |
| if not s is None: | |
| writer.writerow([d, s]) | |
| print d, s | |
| if not data is None: | |
| writer.writerow([d, data]) | |
| print d, data | |
| return | |
| class CircleRaceWindow(QWidget): | |
| # -- 定数(画面サイズ) ----------------- | |
| SCREEN_HEIGHT = 500 | |
| SCREEN_WIDTH = 700 | |
| MARGIN_HEIGHT = 50 | |
| MARGIN_WIDTH = 50 | |
| STAGE_HEIGHT = SCREEN_HEIGHT - 2 * MARGIN_HEIGHT | |
| STAGE_WIDTH = SCREEN_WIDTH - 2 * MARGIN_WIDTH | |
| # -- 定数(タイマー) ------------------- | |
| INTERVAL_TIME = 1 | |
| # -- 定数(その他) --------------------- | |
| SIZE_CAR = 5 | |
| def __init__(self, parent=None): | |
| QWidget.__init__(self, parent) | |
| self.resize(self.SCREEN_WIDTH, self.SCREEN_HEIGHT) | |
| # -- 画面バッファ -------------------- | |
| self.pixmap = QPixmap(self.size()) | |
| # -- 解析用オブジェクト -------------- | |
| self.game = CircleRace(area_width=self.STAGE_WIDTH, area_height=self.STAGE_HEIGHT) | |
| # -- 操作用変数 ---------------------- | |
| self.car_steer = 'right' | |
| self.car_speed = 1 | |
| self.car_steer_level = 1 | |
| self.user_car_steer = None | |
| # -- 初期画面の準備 ------------------ | |
| # 画面バッファの初期化 | |
| self.refreshPixmap() | |
| # グリッドの表示 | |
| painter = QPainter(self.pixmap) | |
| self.drawGrid(painter) | |
| # メインループの準備と開始 | |
| # ------------------------- | |
| if True: | |
| self.timer = QTimer() | |
| self.timer.timeout.connect(self.mainloop) | |
| self.timer.start(self.INTERVAL_TIME) | |
| # self.timer.setInterval(self.INTERVAL_TIME) | |
| # self.timer.start() | |
| # ************************************************************* # | |
| # メインループ | |
| # ************************************************************* # | |
| def mainloop(self): | |
| """ | |
| アニメーションのメインループ | |
| アルゴリズムの時間更新等はここで行う | |
| """ | |
| # -- アルゴリズム処理----------- | |
| # 定常円 | |
| # self.car_steer = 'right' | |
| # -- アルゴリズム処理----------- | |
| sencerL = self.game.car.sencerL_val | |
| sencerR = self.game.car.sencerR_val | |
| if sencerL < sencerR: | |
| self.car_steer = 'left' | |
| elif sencerL > sencerR: | |
| self.car_steer = 'right' | |
| else: | |
| self.car_steer = '' | |
| if not self.user_car_steer is None: | |
| self.car_steer = self.user_car_steer | |
| self.game.update_game(steer=self.car_steer, steer_level=1, speed=self.car_speed) | |
| self.car_steer = None | |
| self.user_car_steer = None | |
| # -- 描画準備 ------------------ | |
| painter = QPainter(self.pixmap) | |
| self.drawGrid(painter) | |
| self.drawGeoPoints(painter) | |
| # 画面更新 | |
| self.update() | |
| def paintEvent(self, *args, **kwargs): | |
| # -- おまじない --------------------------- | |
| # QPainterを生成 | |
| painter = QStylePainter(self) | |
| # QPainterでバッファに準備したデータを描画 | |
| painter.drawPixmap(0, 0, self.pixmap) | |
| painter.setRenderHint(QPainter.Antialiasing, True) | |
| _painter = painter.style() | |
| # 描画用QPenのデフォルト | |
| pen_default = QPen() | |
| pen_default.setColor(Qt.black) | |
| pen_default.setWidth(2) | |
| brush_default = QBrush(Qt.red, Qt.NoBrush) | |
| # -- アクセス用定数 ----------------------- | |
| x, y = 0, 1 | |
| # -- 車両の位置情報を取得 ----------------- | |
| _pos = self.game.car.position.copy() | |
| _vel = self.game.car.velocity.copy() | |
| pos = self.locateXY(_pos) | |
| vel = self.locateVec(_vel) | |
| vel_n = vel / np.linalg.norm(vel) | |
| # -- 車両位置のプロット ------------------- | |
| painter.drawPoint(pos[x], pos[y]) | |
| # -- 車両位置に 円を描画 ----------------------------- | |
| pen = QPen(Qt.black, 1, Qt.DotLine, Qt.RoundCap, Qt.RoundJoin) | |
| painter.setPen(pen) | |
| reward = self.game.car.reward | |
| if reward < 0: | |
| painter.setBrush(QBrush(QColor(255, 0, 0, 175))) | |
| painter.setBrush(QBrush(QColor(0, 255, 0, 175))) | |
| painter.drawEllipse(pos[x] - self.SIZE_CAR / 2, pos[y] - self.SIZE_CAR / 2, self.SIZE_CAR, self.SIZE_CAR) | |
| # -- 車両位置に四角を描画 ----------------------------- | |
| pen = QPen(Qt.black, 0.8, Qt.SolidLine, Qt.RoundCap, Qt.RoundJoin) | |
| painter.setPen(pen) | |
| def rotate(deg): | |
| """ | |
| 回転行列 | |
| :param theta: 回転角度 [deg] | |
| """ | |
| rad = np.deg2rad(deg) | |
| return np.array([[np.cos(rad), -np.sin(rad)], | |
| [np.sin(rad), np.cos(rad)]]) | |
| car_size = self.SIZE_CAR | |
| ny = vel_n.copy() * car_size | |
| nx = np.dot(rotate(90), vel_n.copy()) * car_size / 2.0 | |
| v1 = nx + ny + pos | |
| v2 = nx - ny + pos | |
| v3 = -nx - ny + pos | |
| v4 = -nx + ny + pos | |
| v = [vary.tolist() for vary in [v1, v2, v3, v4]] | |
| qpoints = [QPoint(*lv) for lv in v] | |
| polygon = QPolygon(qpoints) | |
| painter.drawPolygon(polygon) | |
| # -- 進行方向(速度ベクトル)を描画 --------- | |
| pen = QPen(Qt.gray, 1, Qt.SolidLine, Qt.RoundCap, Qt.RoundJoin) | |
| painter.setPen(pen) | |
| painter.drawLine(pos[x], pos[y], (pos[x] + vel[x]), (pos[y] + vel[y])) | |
| # -- センサを表示 ------------------------- | |
| sencerF = self.locateVec(self.game.car.sencerF) + pos | |
| sencerL = self.locateVec(self.game.car.sencerL) + pos | |
| sencerR = self.locateVec(self.game.car.sencerR) + pos | |
| sencer_linewide = 0.6 | |
| painter.setPen(QPen(Qt.red, sencer_linewide)) | |
| painter.drawLine(pos[x], pos[y], sencerF[x], sencerF[y]) | |
| painter.setPen(QPen(Qt.blue, sencer_linewide)) | |
| painter.drawLine(pos[x], pos[y], sencerL[x], sencerL[y]) | |
| painter.setPen(QPen(Qt.green, sencer_linewide)) | |
| painter.drawLine(pos[x], pos[y], sencerR[x], sencerR[y]) | |
| # -- センサグラフを表示 --------------------- | |
| sencerF_val = self.game.car.sencerF_val | |
| sencerR_val = self.game.car.sencerR_val | |
| sencerL_val = self.game.car.sencerL_val | |
| # 棒グラフのサイズ | |
| sencer_graph_width = 20 | |
| sencer_graph_height = 60 | |
| # Rectの左下位置 | |
| DEBUG_MARGIN_WIDTH = 30 | |
| DEBUG_MARGIN_HEIGHT = 30 | |
| pL = self.locateXY([DEBUG_MARGIN_WIDTH + 0, DEBUG_MARGIN_HEIGHT + 0]).tolist() | |
| pF = self.locateXY([DEBUG_MARGIN_WIDTH + sencer_graph_width * 2, DEBUG_MARGIN_HEIGHT + 0]).tolist() | |
| pR = self.locateXY([DEBUG_MARGIN_WIDTH + sencer_graph_width * 4, DEBUG_MARGIN_HEIGHT + 0]).tolist() | |
| # センサー(レフト)の棒グラフ | |
| r = QRect(0, 0, sencer_graph_width, sencer_graph_height) | |
| r.setHeight(sencerL_val * 10) | |
| r.moveBottomLeft(QPoint(*pL)) | |
| # 描画色の設定 | |
| painter.setPen(QPen(Qt.gray, 1, Qt.SolidLine, Qt.RoundCap, Qt.RoundJoin)) | |
| painter.setBrush(QBrush(Qt.blue, Qt.Dense4Pattern)) | |
| painter.drawRect(r) | |
| # センサー(センター)の棒グラフ | |
| r = QRect(0, 0, sencer_graph_width, sencer_graph_height) | |
| r.setHeight(sencerF_val * 10) | |
| r.moveBottomLeft(QPoint(*pF)) | |
| painter.setBrush(QBrush(Qt.red, Qt.Dense4Pattern)) | |
| painter.drawRect(r) | |
| # センサー(ライト)の棒グラフ | |
| r = QRect(0, 0, sencer_graph_width, sencer_graph_height) | |
| r.setHeight(sencerR_val * 10) | |
| r.moveBottomLeft(QPoint(*pR)) | |
| painter.setBrush(QBrush(Qt.green, Qt.Dense4Pattern)) | |
| painter.drawRect(r) | |
| # ************************************************************* # | |
| # 描画系補助関数 | |
| # ************************************************************* # | |
| def drawDebugLog(self, painter): | |
| pass | |
| def drawGeoPoints(self, painter): | |
| pass | |
| def drawGrid(self, painter): | |
| """ マップを表示する関数 | |
| """ | |
| for x in range(self.STAGE_WIDTH): | |
| for y in range(self.STAGE_HEIGHT): | |
| # painter.drawPoint(x,y) | |
| pass | |
| painter.setPen(QPen(Qt.black, 3)) | |
| painter.setBrush(QBrush(QColor(200, 200, 200, 150), Qt.Dense1Pattern)) | |
| painter.drawRect(self.MARGIN_WIDTH, self.MARGIN_HEIGHT, self.STAGE_WIDTH, self.STAGE_HEIGHT) | |
| pass | |
| def locateXY(self, pos): | |
| """ | |
| オブジェクトのローカル座標を、スクリーン上の座標へ変換 | |
| """ | |
| pos = np.asarray(pos) | |
| local_x, local_y = pos.copy() | |
| srn_x = local_x + self.MARGIN_WIDTH | |
| srn_y = (self.SCREEN_HEIGHT - local_y - self.MARGIN_HEIGHT) | |
| return np.asarray([srn_x, srn_y]) | |
| def locateVec(self, vel): | |
| """ | |
| オブジェクトのベクトルを、スクリーン上の座標空間へ変換 | |
| (※yを反転させるだけ) | |
| """ | |
| srn_vec = np.array([vel[0], -1 * vel[1]]) | |
| return srn_vec | |
| # ************************************************************* # | |
| # その他Qt関連補助関数 | |
| # ************************************************************* # | |
| def refreshPixmap(self): | |
| """ | |
| 画面バッファの初期化関数 | |
| """ | |
| # 画面バッファの初期化 | |
| self.pixmap = QPixmap(self.size()) | |
| # 画面を塗りつぶし (おまじない) | |
| self.pixmap.fill(self, 0, 0) | |
| self.pixmap.fill(Qt.white) | |
| # ぺインターの生成 (おまじない) | |
| painter = QPainter(self.pixmap) | |
| # ぺインターによる初期化 (おまじない) | |
| painter.initFrom(self) | |
| pass | |
| def sizeHint(self): | |
| return QSize(self.SCREEN_WIDTH, self.SCREEN_HEIGHT) | |
| def keyPressEvent(self, event): | |
| e = event.key() | |
| if e == Qt.Key_Up: | |
| self.car_speed += 1 | |
| pass | |
| elif e == Qt.Key_Down: | |
| self.car_speed -= 1 | |
| if self.car_speed < 0: | |
| self.car_speed = 0 | |
| elif e == Qt.Key_Left: | |
| self.user_car_steer = 'left' | |
| pass | |
| elif e == Qt.Key_Right: | |
| self.user_car_steer = 'right' | |
| pass | |
| elif e == Qt.Key_Plus: | |
| pass | |
| elif e == Qt.Key_Minus: | |
| pass | |
| elif e == Qt.Key_Q: | |
| self.close() | |
| else: | |
| pass | |
| print 'Presskey', e | |
| self.update() | |
| class Car(object): | |
| """ | |
| 車オブジェクト | |
| 自車の速度・位置の情報を持つ | |
| """ | |
| SENCER_LENGTH = 100. | |
| def __init__(self, iniPos=[0, 0], iniVelocity=[1, 1], sencer_angle=30): | |
| # -- 自車情報 ---------------------- | |
| self.position = np.array([0, 0]) | |
| # velocityはnorm=1ではない | |
| self.velocity = np.array([0, 0]) | |
| self.velocity_n = np.array([0, 0]) | |
| self.speed = 1 | |
| # -- 初期値代入 --------------------- | |
| if not iniPos is None: | |
| _iniPos = np.asarray(iniPos) | |
| self.position = _iniPos | |
| if not iniVelocity is None: | |
| _iniVelocity = np.asarray(iniVelocity) | |
| self.velocity_n = self.vectorNormalize(_iniVelocity) | |
| self.velocity = self.velocity_n * self.speed | |
| # -- センサの情報------------------- | |
| self.sencerL_angle_deg = sencer_angle | |
| self.sencerR_angle_deg = -1. * self.sencerL_angle_deg | |
| self.sencer_length = self.SENCER_LENGTH | |
| self.sencerF = np.array([0, 0]) | |
| self.sencerL = np.array([0, 0]) | |
| self.sencerR = np.array([0, 0]) | |
| self.sencerF_val = 0.0 | |
| self.sencerL_val = 0.0 | |
| self.sencerR_val = 0.0 | |
| # -- センサの更新 ------------------ | |
| self.calc_sencer() | |
| # -- ゲームの情報------------------- | |
| self.reward = 0 | |
| # -- その他 ------------------------ | |
| self.deltaT = 1 | |
| def update(self, deg=0, speed=1): | |
| """ | |
| 車の次の位置を予想し更新 | |
| (※ 等速運動仮定) | |
| :param theta: 回転角度 [deg] | |
| """ | |
| self.speed = speed | |
| # -- 位置速度計算 -------------------------------------- | |
| # (ノルム計算がややこしい. 速度ノルムは常に1を確保する必要あり) | |
| new_velocity_n = np.dot(self.rotate(deg), self.velocity_n.T) | |
| self.velocity = new_velocity_n * self.speed | |
| self.velocity_n = new_velocity_n.copy() | |
| self.position = self.position + self.velocity * self.deltaT | |
| # -- debug.log ----------------------------------------- | |
| s = 'pos:[%0.1f, %0.1f], vel:[%0.1f, %0.1f], vel:%d' % ( | |
| self.position[0], self.position[1], self.velocity[0], self.velocity[1], self.speed) | |
| # debuglog(s) | |
| # -- センサ情報の計算 ---------------------------------- | |
| self.calc_sencer() | |
| def calc_sencer(self): | |
| """ | |
| センサのヴェクトルを更新 | |
| フロントセンサは速度ベクトルと同じ向き | |
| 大きさは, SENCER_LENGTH倍 | |
| :return: | |
| """ | |
| self.sencerF = self.velocity_n.copy() * self.sencer_length | |
| self.sencerL = np.dot(self.rotate(self.sencerL_angle_deg), self.sencerF.T) | |
| self.sencerR = np.dot(self.rotate(self.sencerR_angle_deg), self.sencerF.T) | |
| # -- debug.log ----------------------------------------- | |
| s = 'sencerF:[%0.1f, %0.1f], sencerL:[%0.1f, %0.1f], sencerR:[%0.1f, %0.1f]' % ( | |
| self.sencerF[0], self.sencerF[1], self.sencerL[0], self.sencerL[1], self.sencerR[0], self.sencerR[1],) | |
| # debuglog(s) | |
| def rotate(self, deg): | |
| """ | |
| 回転行列 | |
| :param theta: 回転角度 [deg] | |
| """ | |
| rad = np.deg2rad(deg) | |
| return np.array([[np.cos(rad), -np.sin(rad)], | |
| [np.sin(rad), np.cos(rad)]]) | |
| def set_pos(self, pos): | |
| """ | |
| 車の位置を再度セット | |
| """ | |
| self.position = pos.copy() | |
| # -- debug.log ----------------------------------------- | |
| s = 'reset>> pos:[%0.1f, %0.1f], vel:[%0.1f, %0.1f], vel:%d' % ( | |
| self.position[0], self.position[1], self.velocity[0], self.velocity[1], self.speed) | |
| # debuglog(s) | |
| def vectorNormalize(self, _vec): | |
| vec = np.asarray(_vec) | |
| return vec / np.linalg.norm(vec) | |
| class Field(object): | |
| """ | |
| Fieldクラスは, 地形情報を持つ.主の役割は与えられた位置情報から | |
| 衝突判定を行うのみである. | |
| """ | |
| pass | |
| pass | |
| class CircleRace(object): | |
| """サークルレースマネージャー | |
| 車の状態更新、フィールド情報、得点等をコントロール | |
| CircleRaceクラスはFieldクラスとCarクラスを保持している。 | |
| 主な役割は, CarとFieldの相対位置関係からセンサの値を | |
| 計算し, Carクラスへ渡す役割を持つ。 | |
| works: 1. Carの位置を取得 2. Car位置の更新 | |
| 3. 衝突判定 4. Car位置の修正 5. センサ値の算出 | |
| 汎用性: 地形の変化はFiledクラスが持つ。 | |
| CircleRaceクラスは, Carの位置情報をFieldクラスへ渡し | |
| 衝突判定結果をもとに, センサ位置を更新するのみである。 | |
| ※ 複雑地形の衝突判定はFieldクラスが持つ | |
| """ | |
| STEER_ANGLES = [10, 20, 30, 40, 90] | |
| REWARD_CRASH = -1.0 | |
| SENCER_RESOLUTION = 10 # センサ解像度(何分割するか) | |
| def __init__(self, area_width, area_height): | |
| # -- 領域情報 --- ----------------------------- | |
| self.area_width = area_width | |
| self.area_height = area_height | |
| self.xlim = [0, self.area_width] | |
| self.ylim = [0, self.area_height] | |
| # -- オブジェクト ----------------------------- | |
| initial_pos = [self.area_width / 2., self.area_height / 2.] | |
| self.car = Car(iniPos=initial_pos, iniVelocity=[1, 1], sencer_angle=30) | |
| self.update_game(steer='forward', steer_level=0, speed=0) | |
| def update_game(self, steer='forward', steer_level=0, speed=1): | |
| """ | |
| ゲーム更新 | |
| :param steer_angle: 車の回転角度 [deg] | |
| :param steer_level: 車の回転角度のレベル | |
| """ | |
| # -- t時刻の位置・速度を保管 ------------ | |
| old_pos = self.car.position.copy() | |
| old_vel = self.car.velocity.copy() | |
| # -- 角度の計算(levelからdegreeに変更) -- | |
| if steer is 'left': | |
| steer_angle = self.STEER_ANGLES[steer_level] | |
| elif steer is 'right': | |
| steer_angle = (-1.) * self.STEER_ANGLES[steer_level] | |
| else: | |
| steer_angle = 0 | |
| # -- 車の状態更新 ------------------------ | |
| self.car.update(deg=steer_angle, speed=speed) | |
| new_pos = self.car.position.copy() | |
| new_vel = self.car.velocity.copy() | |
| # -- 衝突判定 ---------------------------- | |
| if self.isCrash(pos=new_pos): | |
| # -- 位置のみ前時刻に戻す -------- | |
| self.car.set_pos(old_pos.copy()) | |
| # -- はねかえす ------------------ | |
| self.car.set_pos(old_pos - (1. / 2.) * old_vel * self.car.deltaT) | |
| # -- 報酬の支払い ---------------- | |
| self.car.reward = self.REWARD_CRASH | |
| # debuglog(s='reward:%0.1f' % self.car.reward) | |
| # -- センサ値の更新 ---------------------- | |
| # TODO: センサーは配列にすべき | |
| self.car.sencerF_val = self.calc_sencer_value(self.car.sencerF, self.car.position) | |
| self.car.sencerR_val = self.calc_sencer_value(self.car.sencerR, self.car.position) | |
| self.car.sencerL_val = self.calc_sencer_value(self.car.sencerL, self.car.position) | |
| def calc_sencer_value(self, sencer_vec, car_pos): | |
| """ | |
| センサ値の計算 | |
| :param sencer: (ndarry) センサの方向ベクトル | |
| :return: センサ値 {0~1}={1:reso}/reso | |
| """ | |
| sencer_value = 0 | |
| # -- センサ値の衝突判定 ------------------ | |
| # for i in range(self.SENCER_RESOLUTION, -1, -1): | |
| # ratio = i / self.SENCER_RESOLUTION | |
| # pos = (sencer_vec * ratio) + car_pos | |
| # | |
| # if not self.isCrash(pos): | |
| # sencer_value = i | |
| # break | |
| # -- センサ値の衝突判定 ------------------ | |
| # 計算負荷がかかるが、試作のため、自車位置から検索 | |
| # TODO: ソート方式に変更 | |
| for i in range(self.SENCER_RESOLUTION): | |
| ratio = float(i) / self.SENCER_RESOLUTION | |
| sencer_pos = (sencer_vec * ratio) + car_pos | |
| if self.isCrash(sencer_pos): | |
| return i | |
| return sencer_value | |
| def isCrash(self, pos): | |
| """ | |
| 車が壁に衝突したか判定 | |
| :param pos: (ndarry) 位置 | |
| :return: (bool) 衝突:true, 以外:false | |
| """ | |
| x, y = [0, 1] | |
| xlim_min, xlim_max = self.xlim | |
| ylim_min, ylim_max = self.ylim | |
| if pos[x] < xlim_min or xlim_max < pos[x]: | |
| return True | |
| if pos[y] < ylim_min or ylim_max < pos[y]: | |
| return True | |
| return False | |
| pass | |
| ACTION = ['left', 'forward', 'right'] | |
| STERR_LEVEL = 1 | |
| SPEED = 1 | |
| class CircleRaceOfQlean(CircleRace): | |
| def get_state(self): | |
| """ | |
| Q学習用に環境値を返す関数 | |
| :return: | |
| """ | |
| sencers = [] | |
| sencers.append(self.car.sencerL_val) | |
| sencers.append(self.car.sencerF_val) | |
| sencers.append(self.car.sencerR_val) | |
| return np.asarray(sencers) | |
| def get_next_state(self, action): | |
| """ | |
| Q学習用。与えられたアクションから状態を更新 | |
| :param action: | |
| :return: | |
| """ | |
| self.update_game(steer=ACTION[action], steer_level=STERR_LEVEL, speed=SPEED) | |
| r = self.car.reward | |
| return self.get_state(), r | |
| def encodeStateToO(self, S): | |
| o = np.asarray(S).copy() | |
| return o | |
| class Agent(object): | |
| """ ゲームルールによらない汎用性を持たす | |
| action: パターンの数だけ保持 | |
| 学習アルゴリズム: Q学習 | |
| a = getNextAction(s) | |
| lean(S,a,r,S_next) | |
| """ | |
| ALPHA = 0.9 | |
| GAMMA = 0.9 | |
| DATASET_NUMBER = 1000 | |
| LEAN_EPOCHS = 100 | |
| LEAN_RATE = 0.2 | |
| GREEDY_RATIO = 0.2 | |
| def __init__(self, _numInput=3, numAction=3): | |
| self.action_paturn = range(numAction) | |
| self.learningObj = MultiLayerPerceptron(numInput=_numInput, numHidden=5, numOutput=numAction, activate1="tanh", | |
| activate2="sigmoid") | |
| self.X = [] | |
| self.Y = [] | |
| self.learnFlg = True | |
| def learn(self, o, a, r, o_next): | |
| """Q学習 or NeuralNetworkを使って,Q値を学習""" | |
| dQs = self.learningObj.predict(o) | |
| qk = dQs[a] | |
| maxQ = np.max(dQs) | |
| dQs[a] = qk + self.ALPHA * (r + self.GAMMA * maxQ - qk) | |
| self.X.append(np.asarray(o)) | |
| self.Y.append(np.asarray(dQs)) | |
| if len(self.X) > self.DATASET_NUMBER: | |
| self.X.pop(0) | |
| self.Y.pop(0) | |
| err = self.learningObj.fit(np.asarray(self.X), np.asarray(self.Y), learning_rate=self.LEAN_RATE, | |
| epochs=self.LEAN_EPOCHS) | |
| return err | |
| def getNextAction(self, o): | |
| a = None | |
| # 最大Q値の行動選択 | |
| # 観測(observe)から、NNでQ値(配列)を取得 | |
| Q_t = self.learningObj.predict(o) | |
| maxQt_idx = np.argmax(Q_t) | |
| best_actions = maxQt_idx | |
| try: | |
| a = np.random.choice(best_actions) | |
| except: | |
| print 'Q_t', Q_t | |
| print 'best_actions', best_actions | |
| print 'ERROR' | |
| a = best_actions | |
| print 'a', a | |
| if self.learnFlg: | |
| return a | |
| # greedyの行動選択 | |
| if GREEDY_RATIO < random.random(): | |
| return a | |
| else: | |
| return np.random.choice(range(self.action_paturn)) | |
| def mainAlgorism(): | |
| """ QNeuralNetwork | |
| + state<-CircleRace | |
| + agent<-QNeuralNetwork | |
| + iterate_step = {0} | |
| ----------------------------- | |
| : mainloop() | |
| - {s0} = state.get_init_state() | |
| - {a} = agent.get_next_action(s0) | |
| - {s1,R} = state.get_next_state(a) | |
| - {} = agent.learn(s0,a,R,s1} | |
| - iterate_step += 1 | |
| """ | |
| NN_ALPHA = 0.9 | |
| NN_GAMMA = 0.9 | |
| state = CircleRaceOfQlean(area_width=300, area_height=500) | |
| agent = Agent(numInput=3, numHidden=5, numOutput=3, activate1="tanh", activate2="sigmoid") | |
| class mainAlgorism(): | |
| def __init__(self): | |
| self.agent = Agent(_numInput=3,numAction=3) | |
| self.state = CircleRaceOfQlean(area_width=300, area_height=500) | |
| self.lean_flg = True | |
| # self.agent.setLearnFlg(self.learn_flg) | |
| for i in range(1999): | |
| self.update() | |
| def update(self): | |
| # 初期設定 | |
| # ----------------------------- | |
| S = self.state.get_state() | |
| o = self.state.encodeStateToO(S) | |
| # 1. エージェントは環境から受け取った観測Sを受け取り、 | |
| # 方策planに基づいて環境に行動aを渡す | |
| # ----------------------------- | |
| a = self.agent.getNextAction(np.copy(o)) | |
| # 2. 環境StateはエージェントAgentから受け取った行動aと、 | |
| # 現在の状態Sにもとづいて、次の状態S'を返却 | |
| # ----------------------------- | |
| S_next, r = self.state.get_next_state(a) | |
| o_next = self.state.encodeStateToO(np.copy(S_next)) | |
| # 3. Agentに学習させる | |
| # ----------------------------- | |
| err = self.agent.learn(np.copy(o), a, r, np.copy(o_next)) | |
| print 'err', err, a, r | |
| def mainGUI(): | |
| app = QApplication(sys.argv) | |
| win = CircleRaceWindow() | |
| win.show() | |
| sys.exit(app.exec_()) | |
| if __name__ == "__main__": | |
| # print(CircleRace.__doc__) | |
| # mainGUI() | |
| mainAlgorism() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! coding:utf-8 | |
| """ | |
| MultiLayerPerceptron.py | |
| ## Description | |
| 多層パーセプトロンの高速化 | |
| http://aidiary.hatenablog.com/entry/20140201/1391218771 | |
| 入力層 - 隠れ層 - 出力層の3層構造で固定(PRMLではこれを2層と呼んでいる) | |
| 隠れ層の活性化関数にはtanh関数またはsigmoid logistic関数が使える | |
| 出力層の活性化関数にはtanh関数、sigmoid logistic関数、恒等関数が使える | |
| ## コメント | |
| 前回、3層以上の多層パーセプトロンを実装しようとしたが失敗。 | |
| 隠れ層を含んだ、各レイヤーをnumpy配列として、 | |
| 一つのリストに格納していたが、どうも型がおかしくなって上手くいかない。 | |
| おそらく、各レイヤーはオブジェクトとして実装したほうがよい。 | |
| 今回は、基本に戻り3層パーセプトロンを実装する。 | |
| 参考コードは関数化されており、拡張性を秘めているので利用する。 | |
| ただし、拡張性については、拡張するのはいいが、 | |
| いずれは機械学習ライブラリを利用していくほうがよい。 | |
| 現状は3層パーセプトロンを実装し、QLearningとの統合を目指す。 | |
| Created by fifi (2015/11/24 5:54) | |
| """ | |
| __version__ = '1.1' | |
| import numpy as np | |
| import sys | |
| # numpy配列をプリントした場合の表示桁数 | |
| np.set_printoptions(precision=3) | |
| def ndprint(a, format_string='{0:.2f}'): | |
| """ | |
| ndarrayをprintする関数 | |
| :example: ndprint(x) | |
| """ | |
| return [format_string.format(v, i) for i, v in enumerate(a)] | |
| def ndprints(s, a, format_string='{0:.2f}'): | |
| """ | |
| ndarrayをprintする関数 | |
| :example: ndprint(x) | |
| """ | |
| print s, [format_string.format(v, i) for i, v in enumerate(a)] | |
| ############################################ | |
| # | |
| # サブ関数群 | |
| # | |
| ############################################ | |
| def tanh(x): | |
| """ | |
| tanh(x) | |
| :param x: | |
| """ | |
| return np.tanh(x) | |
| def tanh_deriv(x): | |
| """ | |
| tanhの微分. tanh'(x) | |
| tanh'(x)=1 - tanh(x)**2 | |
| :param x: tanh(x) | |
| """ | |
| return 1.0 - x ** 2 | |
| def sigmoid(x): | |
| """ | |
| シグモイド関数 | |
| """ | |
| return 1. / (1. + np.exp(-x)) | |
| def sigmoid_deriv(x): | |
| """ | |
| シグモイド関数の導関数. | |
| sigmoid'(x) = sigmoid(x) * (1-sigmoid(x)) | |
| :param x: sigmoid(x) | |
| """ | |
| return x * (1 - x) | |
| def identity(x): | |
| """ | |
| 線形関数. | |
| """ | |
| return x | |
| def identity_deriv(x): | |
| """ | |
| 線形関数の微分 | |
| :param x: | |
| """ | |
| return 1 | |
| ############################################ | |
| # | |
| # 多層パーセプトロンクラス | |
| # | |
| ############################################ | |
| class MultiLayerPerceptron: | |
| def __init__(self, numInput=3, numHidden=5, numOutput=1, activate1="tanh", activate2="sigmoid"): | |
| """ | |
| 多層パーセプトロンを初期化 | |
| :param numInput: 入力層のユニット数(バイアスユニットは除く) | |
| :param numHidden: 隠れ層のユニット数(バイアスユニットは除く) | |
| :param numOutput: 出力層のユニット数 | |
| :param act1: 隠れ層の活性化関数 ( tanh or sigmoid ) | |
| :param act2: 出力層の活性化関数 ( tanh or sigmoid or identity ) | |
| """ | |
| # 引数の指定に合わせて隠れ層の活性化関数とその微分関数を設定 | |
| # ---------------------------------------------------------- | |
| if activate1 == "tanh": | |
| self.act1 = tanh | |
| self.act1_deriv = tanh_deriv | |
| elif activate1 == "sigmoid": | |
| self.act1 = sigmoid | |
| self.act1_deriv = sigmoid_deriv | |
| else: | |
| print "ERROR: act1 is tanh or sigmoid" | |
| sys.exit() | |
| print ">> Hidden Layer Activation Function is : %s" % activate1 | |
| # 引数の指定に合わせて、出力層の活性化関数とその微分関数を設定 | |
| # ---------------------------------------------------------- | |
| if activate2 == "tanh": | |
| self.act2 = tanh | |
| self.act2_deriv = tanh_deriv | |
| elif activate2 == "sigmoid": | |
| self.act2 = sigmoid | |
| self.act2_deriv = sigmoid_deriv | |
| elif activate2 == "identity": | |
| self.act2 = identity | |
| self.act2_deriv = identity_deriv | |
| else: | |
| print "ERROR: act2 is tanh or sigmoid or identity" | |
| sys.exit() | |
| print ">> Outpu Layer Activation Function is : %s" % activate2 | |
| # 各レイヤのユニット数を格納(バイアスユニットがあるので入力層と隠れ層は+1) | |
| # ------------------------------------------------------------------------ | |
| self.numInput = numInput + 1 | |
| self.numHidden = numHidden + 1 | |
| self.numOutput = numOutput | |
| # 重みを(-1.0, 1.0)の一様乱数で初期化 | |
| # ------------------------------------ | |
| # -- 入力層 - 隠れ層 | |
| self.weight1 = np.random.uniform( | |
| -1.0, | |
| 1.0, | |
| (self.numHidden, self.numInput) | |
| ) | |
| # -- 隠れ層 - 出力層 | |
| self.weight2 = np.random.uniform( | |
| -1.0, | |
| 1.0, | |
| (self.numOutput, self.numHidden) | |
| ) | |
| # print self.weight1 | |
| # print self.weight2 | |
| def fit(self, X, t, learning_rate=0.2, epochs=10000): | |
| """ | |
| 訓練データを用いてネットワークの重みを更新する | |
| :param X: 入力データ | |
| :param t: 教師データ | |
| :param learning_rate: 学習率 | |
| :param epochs: 更新回数 | |
| """ | |
| # バイアスユニットを追加 | |
| X = np.hstack([np.ones([X.shape[0], 1]), X]) | |
| # 教師データ | |
| t = np.array(t) | |
| # 誤差 | |
| err = 0 | |
| # 逐次学習 | |
| # (訓練データからランダムサンプリングして重みを更新。epochs回繰り返す) | |
| # --------------------------------------------------------------------- | |
| for k in range(epochs): | |
| if k % 1000 == 0: | |
| # print '>Epochs:%d' % k | |
| pass | |
| # 訓練データからランダムにサンプルを選択 | |
| # -------------------------------------- | |
| i = np.random.randint(X.shape[0]) | |
| x = X[i] | |
| # 順伝播 | |
| # ====== | |
| # 入力を順伝播させて中間層の出力を計算 | |
| # ------------------------------------- | |
| z = self.act1(np.dot(self.weight1, x)) | |
| # 中間層の出力を順伝播させて出力層の出力を計算 | |
| # -------------------------------------------- | |
| y = self.act2(np.dot(self.weight2, z)) | |
| # 出力層の誤差を計算 | |
| # ================== | |
| # ** WARNING ** | |
| # PRMLによると出力層の活性化関数にどれを用いても | |
| # (y - t[i])でよいと書いてあるが | |
| # 下のように出力層の活性化関数の微分もかけたほうが精度がずっと良くなる | |
| # 教師データと出力ユニットとの誤差を計算 | |
| # --------------------------------------- | |
| delta2 = self.act2_deriv(y) * (y - t[i]) | |
| # 出力層の誤差を逆伝播させて隠れ層の誤差を計算 | |
| # ------------------------------------------- | |
| delta1 = self.act1_deriv(z) * np.dot(self.weight2.T, delta2) | |
| # 重みの更新 | |
| # ========== | |
| # (行列演算になるので2次元ベクトルに変換する必要がある) | |
| x = np.atleast_2d(x) | |
| delta1 = np.atleast_2d(delta1) | |
| # 隠れ層の誤差を用いて隠れ層の重みを更新 | |
| # ----------------------------------------------------- | |
| # self.weight1 = self.weight1 - learning_rate * np.dot(delta1.T, x) | |
| self.weight1 -= learning_rate * np.dot(delta1.T, x) | |
| # (行列演算になるので2次元ベクトルに変換する必要がある) | |
| z = np.atleast_2d(z) | |
| delta2 = np.atleast_2d(delta2) | |
| # 出力層の誤差を用いて、出力層の重みを更新 | |
| # ----------------------------------------- | |
| # self.weight2 = self.weight2 - learning_rate * np.dot(delta2.T, z) | |
| self.weight2 -= learning_rate * np.dot(delta2.T, z) | |
| err += delta2 | |
| return np.sum(err) | |
| def predict(self, x0): | |
| """ | |
| テストデータの出力を予測し、教師と比較 | |
| :param x: | |
| """ | |
| x = np.asarray(x0) | |
| # バイアスの1を追加 | |
| x = np.insert(x, 0, 1) | |
| # 順伝播によりネットワークの出力を計算 | |
| z = self.act1(np.dot(self.weight1, x)) | |
| y = self.act2(np.dot(self.weight2, z)) | |
| return y | |
| if __name__ == '__main__': | |
| mpl = MultiLayerPerceptron(numInput=2, numHidden=5, numOutput=1, activate1="tanh", activate2="sigmoid") | |
| X = np.array([[0, 0], [0, 1], [1, 0], [1, 1]]) | |
| y = np.array([0, 1, 1, 0]) | |
| mpl.fit(X, y) | |
| for x, y in zip(X, y): | |
| print 'X:%s, y:%0.2f, pred:%0.2f' % (ndprint(x), y, mpl.predict(x)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment