Skip to content

Instantly share code, notes, and snippets.

@peace098beat
Created November 23, 2015 22:18
Show Gist options
  • Save peace098beat/2647bb79b0f52fa108fe to your computer and use it in GitHub Desktop.
Save peace098beat/2647bb79b0f52fa108fe to your computer and use it in GitHub Desktop.
[機械学習] DQN 車のレース. GUI付き。 ver0.1
#! coding:utf-8
"""
circle-race-dqn.py
箱の中で壁にぶつからないように動くAI
==============================
= CircleRaceWindow(QMainWindow)
==============================
+ state<-CircleRace
+ agent<-QNeuralNetwork
+ iterate_step = {0}
-----------------------------
: mainloop()
- {s0} = state.get_init_state()
- {a} = agent.get_next_action(s0)
- {s1,R} = state.get_next_state(a)
- {} = agent.learn(s0,a,R,s1}
- iterate_step += 1
: paintEvent()
- _drawField()
[CircleRaceに特化した処理]
- _drawCar()
[CircleRaceに特化した処理]
-----------------------------
: _drawField()
- {paint field}
: _drawCar
- car_status = state.car.get_status
- {paint car status}
-----------------------------
================================
= CircleRace{State}
================================
+ field <- Field()
+ car <- Car()
-----------------------------
: get_init_state()
[ +Car()の初期位置・状態を返却]
: get_next_state(action)
[ +Car()の位置・状態を更新]
-----------------------------
memo:
CircleRaceクラスはFieldクラスとCarクラスを保持している。
主な役割は, CarとFieldの相対位置関係からセンサの値を
計算し, Carクラスへ渡す役割を持つ。
ex. 1. Carの位置を取得 2. Car位置の更新
3. 衝突判定 4. Car位置の修正 5. センサ値の算出
汎用性: 地形の変化はFiledクラスが持つ。
CircleRaceクラスは, Carの位置情報をFieldクラスへ渡し
衝突判定結果をもとに, センサ位置を更新するのみである。
※ 複雑地形の衝突判定はFieldクラスが持つ
================================
= QNeuralNetwork{Agent}
================================
+ NN <- NewralNetwork()
-----------------------------
: {s} = get_next_action(state=s)
: {error} = lean(s0,a,R,s1)
-----------------------------
memo:
QNeuralNetworkクラスは, 入力(センサ)データから, Q値を予測し
次の行動を算出するクラスである.
================================
= Field
================================
+ width, height
+ geometory
-----------------------------
: isCrush(pos=[x,y])
-----------------------------
Fieldクラスは, 地形情報を持つ.主の役割は与えられた位置情報から
衝突判定を行うのみである.
(2015/11/24) 初期コードを生成
Created by fifi (2015/11/21 9:08)
"""
__version__ = '0.1'
# モジュールのインポート
import os
import sys
from datetime import datetime
import csv
from PySide.QtGui import *
from PySide.QtCore import *
import numpy as np
from MultiLayerPerceptron import MultiLayerPerceptron
def resetlog():
"""resetlog()
ログファイルを削除
"""
logfile = '.log.csv'
if os.path.exists(logfile):
os.remove('.log.csv')
resetlog()
def debuglog(s=None, data=None):
"""debuglog()
ログファイルの保存
"""
d = datetime.now().isoformat()
with open('.log.csv', 'a') as f:
writer = csv.writer(f, lineterminator='\n')
if not s is None:
writer.writerow([d, s])
print d, s
if not data is None:
writer.writerow([d, data])
print d, data
return
class CircleRaceWindow(QWidget):
# -- 定数(画面サイズ) -----------------
SCREEN_HEIGHT = 500
SCREEN_WIDTH = 700
MARGIN_HEIGHT = 50
MARGIN_WIDTH = 50
STAGE_HEIGHT = SCREEN_HEIGHT - 2 * MARGIN_HEIGHT
STAGE_WIDTH = SCREEN_WIDTH - 2 * MARGIN_WIDTH
# -- 定数(タイマー) -------------------
INTERVAL_TIME = 1
# -- 定数(その他) ---------------------
SIZE_CAR = 5
def __init__(self, parent=None):
QWidget.__init__(self, parent)
self.resize(self.SCREEN_WIDTH, self.SCREEN_HEIGHT)
# -- 画面バッファ --------------------
self.pixmap = QPixmap(self.size())
# -- 解析用オブジェクト --------------
self.game = CircleRace(area_width=self.STAGE_WIDTH, area_height=self.STAGE_HEIGHT)
# -- 操作用変数 ----------------------
self.car_steer = 'right'
self.car_speed = 1
self.car_steer_level = 1
self.user_car_steer = None
# -- 初期画面の準備 ------------------
# 画面バッファの初期化
self.refreshPixmap()
# グリッドの表示
painter = QPainter(self.pixmap)
self.drawGrid(painter)
# メインループの準備と開始
# -------------------------
if True:
self.timer = QTimer()
self.timer.timeout.connect(self.mainloop)
self.timer.start(self.INTERVAL_TIME)
# self.timer.setInterval(self.INTERVAL_TIME)
# self.timer.start()
# ************************************************************* #
# メインループ
# ************************************************************* #
def mainloop(self):
"""
アニメーションのメインループ
アルゴリズムの時間更新等はここで行う
"""
# -- アルゴリズム処理-----------
# 定常円
# self.car_steer = 'right'
# -- アルゴリズム処理-----------
sencerL = self.game.car.sencerL_val
sencerR = self.game.car.sencerR_val
if sencerL < sencerR:
self.car_steer = 'left'
elif sencerL > sencerR:
self.car_steer = 'right'
else:
self.car_steer = ''
if not self.user_car_steer is None:
self.car_steer = self.user_car_steer
self.game.update_game(steer=self.car_steer, steer_level=1, speed=self.car_speed)
self.car_steer = None
self.user_car_steer = None
# -- 描画準備 ------------------
painter = QPainter(self.pixmap)
self.drawGrid(painter)
self.drawGeoPoints(painter)
# 画面更新
self.update()
def paintEvent(self, *args, **kwargs):
# -- おまじない ---------------------------
# QPainterを生成
painter = QStylePainter(self)
# QPainterでバッファに準備したデータを描画
painter.drawPixmap(0, 0, self.pixmap)
painter.setRenderHint(QPainter.Antialiasing, True)
_painter = painter.style()
# 描画用QPenのデフォルト
pen_default = QPen()
pen_default.setColor(Qt.black)
pen_default.setWidth(2)
brush_default = QBrush(Qt.red, Qt.NoBrush)
# -- アクセス用定数 -----------------------
x, y = 0, 1
# -- 車両の位置情報を取得 -----------------
_pos = self.game.car.position.copy()
_vel = self.game.car.velocity.copy()
pos = self.locateXY(_pos)
vel = self.locateVec(_vel)
vel_n = vel / np.linalg.norm(vel)
# -- 車両位置のプロット -------------------
painter.drawPoint(pos[x], pos[y])
# -- 車両位置に 円を描画 -----------------------------
pen = QPen(Qt.black, 1, Qt.DotLine, Qt.RoundCap, Qt.RoundJoin)
painter.setPen(pen)
reward = self.game.car.reward
if reward < 0:
painter.setBrush(QBrush(QColor(255, 0, 0, 175)))
painter.setBrush(QBrush(QColor(0, 255, 0, 175)))
painter.drawEllipse(pos[x] - self.SIZE_CAR / 2, pos[y] - self.SIZE_CAR / 2, self.SIZE_CAR, self.SIZE_CAR)
# -- 車両位置に四角を描画 -----------------------------
pen = QPen(Qt.black, 0.8, Qt.SolidLine, Qt.RoundCap, Qt.RoundJoin)
painter.setPen(pen)
def rotate(deg):
"""
回転行列
:param theta: 回転角度 [deg]
"""
rad = np.deg2rad(deg)
return np.array([[np.cos(rad), -np.sin(rad)],
[np.sin(rad), np.cos(rad)]])
car_size = self.SIZE_CAR
ny = vel_n.copy() * car_size
nx = np.dot(rotate(90), vel_n.copy()) * car_size / 2.0
v1 = nx + ny + pos
v2 = nx - ny + pos
v3 = -nx - ny + pos
v4 = -nx + ny + pos
v = [vary.tolist() for vary in [v1, v2, v3, v4]]
qpoints = [QPoint(*lv) for lv in v]
polygon = QPolygon(qpoints)
painter.drawPolygon(polygon)
# -- 進行方向(速度ベクトル)を描画 ---------
pen = QPen(Qt.gray, 1, Qt.SolidLine, Qt.RoundCap, Qt.RoundJoin)
painter.setPen(pen)
painter.drawLine(pos[x], pos[y], (pos[x] + vel[x]), (pos[y] + vel[y]))
# -- センサを表示 -------------------------
sencerF = self.locateVec(self.game.car.sencerF) + pos
sencerL = self.locateVec(self.game.car.sencerL) + pos
sencerR = self.locateVec(self.game.car.sencerR) + pos
sencer_linewide = 0.6
painter.setPen(QPen(Qt.red, sencer_linewide))
painter.drawLine(pos[x], pos[y], sencerF[x], sencerF[y])
painter.setPen(QPen(Qt.blue, sencer_linewide))
painter.drawLine(pos[x], pos[y], sencerL[x], sencerL[y])
painter.setPen(QPen(Qt.green, sencer_linewide))
painter.drawLine(pos[x], pos[y], sencerR[x], sencerR[y])
# -- センサグラフを表示 ---------------------
sencerF_val = self.game.car.sencerF_val
sencerR_val = self.game.car.sencerR_val
sencerL_val = self.game.car.sencerL_val
# 棒グラフのサイズ
sencer_graph_width = 20
sencer_graph_height = 60
# Rectの左下位置
DEBUG_MARGIN_WIDTH = 30
DEBUG_MARGIN_HEIGHT = 30
pL = self.locateXY([DEBUG_MARGIN_WIDTH + 0, DEBUG_MARGIN_HEIGHT + 0]).tolist()
pF = self.locateXY([DEBUG_MARGIN_WIDTH + sencer_graph_width * 2, DEBUG_MARGIN_HEIGHT + 0]).tolist()
pR = self.locateXY([DEBUG_MARGIN_WIDTH + sencer_graph_width * 4, DEBUG_MARGIN_HEIGHT + 0]).tolist()
# センサー(レフト)の棒グラフ
r = QRect(0, 0, sencer_graph_width, sencer_graph_height)
r.setHeight(sencerL_val * 10)
r.moveBottomLeft(QPoint(*pL))
# 描画色の設定
painter.setPen(QPen(Qt.gray, 1, Qt.SolidLine, Qt.RoundCap, Qt.RoundJoin))
painter.setBrush(QBrush(Qt.blue, Qt.Dense4Pattern))
painter.drawRect(r)
# センサー(センター)の棒グラフ
r = QRect(0, 0, sencer_graph_width, sencer_graph_height)
r.setHeight(sencerF_val * 10)
r.moveBottomLeft(QPoint(*pF))
painter.setBrush(QBrush(Qt.red, Qt.Dense4Pattern))
painter.drawRect(r)
# センサー(ライト)の棒グラフ
r = QRect(0, 0, sencer_graph_width, sencer_graph_height)
r.setHeight(sencerR_val * 10)
r.moveBottomLeft(QPoint(*pR))
painter.setBrush(QBrush(Qt.green, Qt.Dense4Pattern))
painter.drawRect(r)
# ************************************************************* #
# 描画系補助関数
# ************************************************************* #
def drawDebugLog(self, painter):
pass
def drawGeoPoints(self, painter):
pass
def drawGrid(self, painter):
""" マップを表示する関数
"""
for x in range(self.STAGE_WIDTH):
for y in range(self.STAGE_HEIGHT):
# painter.drawPoint(x,y)
pass
painter.setPen(QPen(Qt.black, 3))
painter.setBrush(QBrush(QColor(200, 200, 200, 150), Qt.Dense1Pattern))
painter.drawRect(self.MARGIN_WIDTH, self.MARGIN_HEIGHT, self.STAGE_WIDTH, self.STAGE_HEIGHT)
pass
def locateXY(self, pos):
"""
オブジェクトのローカル座標を、スクリーン上の座標へ変換
"""
pos = np.asarray(pos)
local_x, local_y = pos.copy()
srn_x = local_x + self.MARGIN_WIDTH
srn_y = (self.SCREEN_HEIGHT - local_y - self.MARGIN_HEIGHT)
return np.asarray([srn_x, srn_y])
def locateVec(self, vel):
"""
オブジェクトのベクトルを、スクリーン上の座標空間へ変換
(※yを反転させるだけ)
"""
srn_vec = np.array([vel[0], -1 * vel[1]])
return srn_vec
# ************************************************************* #
# その他Qt関連補助関数
# ************************************************************* #
def refreshPixmap(self):
"""
画面バッファの初期化関数
"""
# 画面バッファの初期化
self.pixmap = QPixmap(self.size())
# 画面を塗りつぶし (おまじない)
self.pixmap.fill(self, 0, 0)
self.pixmap.fill(Qt.white)
# ぺインターの生成 (おまじない)
painter = QPainter(self.pixmap)
# ぺインターによる初期化 (おまじない)
painter.initFrom(self)
pass
def sizeHint(self):
return QSize(self.SCREEN_WIDTH, self.SCREEN_HEIGHT)
def keyPressEvent(self, event):
e = event.key()
if e == Qt.Key_Up:
self.car_speed += 1
pass
elif e == Qt.Key_Down:
self.car_speed -= 1
if self.car_speed < 0:
self.car_speed = 0
elif e == Qt.Key_Left:
self.user_car_steer = 'left'
pass
elif e == Qt.Key_Right:
self.user_car_steer = 'right'
pass
elif e == Qt.Key_Plus:
pass
elif e == Qt.Key_Minus:
pass
elif e == Qt.Key_Q:
self.close()
else:
pass
print 'Presskey', e
self.update()
class Car(object):
"""
車オブジェクト
自車の速度・位置の情報を持つ
"""
SENCER_LENGTH = 100.
def __init__(self, iniPos=[0, 0], iniVelocity=[1, 1], sencer_angle=30):
# -- 自車情報 ----------------------
self.position = np.array([0, 0])
# velocityはnorm=1ではない
self.velocity = np.array([0, 0])
self.velocity_n = np.array([0, 0])
self.speed = 1
# -- 初期値代入 ---------------------
if not iniPos is None:
_iniPos = np.asarray(iniPos)
self.position = _iniPos
if not iniVelocity is None:
_iniVelocity = np.asarray(iniVelocity)
self.velocity_n = self.vectorNormalize(_iniVelocity)
self.velocity = self.velocity_n * self.speed
# -- センサの情報-------------------
self.sencerL_angle_deg = sencer_angle
self.sencerR_angle_deg = -1. * self.sencerL_angle_deg
self.sencer_length = self.SENCER_LENGTH
self.sencerF = np.array([0, 0])
self.sencerL = np.array([0, 0])
self.sencerR = np.array([0, 0])
self.sencerF_val = 0.0
self.sencerL_val = 0.0
self.sencerR_val = 0.0
# -- センサの更新 ------------------
self.calc_sencer()
# -- ゲームの情報-------------------
self.reward = 0
# -- その他 ------------------------
self.deltaT = 1
def update(self, deg=0, speed=1):
"""
車の次の位置を予想し更新
(※ 等速運動仮定)
:param theta: 回転角度 [deg]
"""
self.speed = speed
# -- 位置速度計算 --------------------------------------
# (ノルム計算がややこしい. 速度ノルムは常に1を確保する必要あり)
new_velocity_n = np.dot(self.rotate(deg), self.velocity_n.T)
self.velocity = new_velocity_n * self.speed
self.velocity_n = new_velocity_n.copy()
self.position = self.position + self.velocity * self.deltaT
# -- debug.log -----------------------------------------
s = 'pos:[%0.1f, %0.1f], vel:[%0.1f, %0.1f], vel:%d' % (
self.position[0], self.position[1], self.velocity[0], self.velocity[1], self.speed)
# debuglog(s)
# -- センサ情報の計算 ----------------------------------
self.calc_sencer()
def calc_sencer(self):
"""
センサのヴェクトルを更新
フロントセンサは速度ベクトルと同じ向き
大きさは, SENCER_LENGTH倍
:return:
"""
self.sencerF = self.velocity_n.copy() * self.sencer_length
self.sencerL = np.dot(self.rotate(self.sencerL_angle_deg), self.sencerF.T)
self.sencerR = np.dot(self.rotate(self.sencerR_angle_deg), self.sencerF.T)
# -- debug.log -----------------------------------------
s = 'sencerF:[%0.1f, %0.1f], sencerL:[%0.1f, %0.1f], sencerR:[%0.1f, %0.1f]' % (
self.sencerF[0], self.sencerF[1], self.sencerL[0], self.sencerL[1], self.sencerR[0], self.sencerR[1],)
# debuglog(s)
def rotate(self, deg):
"""
回転行列
:param theta: 回転角度 [deg]
"""
rad = np.deg2rad(deg)
return np.array([[np.cos(rad), -np.sin(rad)],
[np.sin(rad), np.cos(rad)]])
def set_pos(self, pos):
"""
車の位置を再度セット
"""
self.position = pos.copy()
# -- debug.log -----------------------------------------
s = 'reset>> pos:[%0.1f, %0.1f], vel:[%0.1f, %0.1f], vel:%d' % (
self.position[0], self.position[1], self.velocity[0], self.velocity[1], self.speed)
# debuglog(s)
def vectorNormalize(self, _vec):
vec = np.asarray(_vec)
return vec / np.linalg.norm(vec)
class Field(object):
"""
Fieldクラスは, 地形情報を持つ.主の役割は与えられた位置情報から
衝突判定を行うのみである.
"""
pass
pass
class CircleRace(object):
"""サークルレースマネージャー
車の状態更新、フィールド情報、得点等をコントロール
CircleRaceクラスはFieldクラスとCarクラスを保持している。
主な役割は, CarとFieldの相対位置関係からセンサの値を
計算し, Carクラスへ渡す役割を持つ。
works: 1. Carの位置を取得 2. Car位置の更新
3. 衝突判定 4. Car位置の修正 5. センサ値の算出
汎用性: 地形の変化はFiledクラスが持つ。
CircleRaceクラスは, Carの位置情報をFieldクラスへ渡し
衝突判定結果をもとに, センサ位置を更新するのみである。
※ 複雑地形の衝突判定はFieldクラスが持つ
"""
STEER_ANGLES = [10, 20, 30, 40, 90]
REWARD_CRASH = -1.0
SENCER_RESOLUTION = 10 # センサ解像度(何分割するか)
def __init__(self, area_width, area_height):
# -- 領域情報 --- -----------------------------
self.area_width = area_width
self.area_height = area_height
self.xlim = [0, self.area_width]
self.ylim = [0, self.area_height]
# -- オブジェクト -----------------------------
initial_pos = [self.area_width / 2., self.area_height / 2.]
self.car = Car(iniPos=initial_pos, iniVelocity=[1, 1], sencer_angle=30)
self.update_game(steer='forward', steer_level=0, speed=0)
def update_game(self, steer='forward', steer_level=0, speed=1):
"""
ゲーム更新
:param steer_angle: 車の回転角度 [deg]
:param steer_level: 車の回転角度のレベル
"""
# -- t時刻の位置・速度を保管 ------------
old_pos = self.car.position.copy()
old_vel = self.car.velocity.copy()
# -- 角度の計算(levelからdegreeに変更) --
if steer is 'left':
steer_angle = self.STEER_ANGLES[steer_level]
elif steer is 'right':
steer_angle = (-1.) * self.STEER_ANGLES[steer_level]
else:
steer_angle = 0
# -- 車の状態更新 ------------------------
self.car.update(deg=steer_angle, speed=speed)
new_pos = self.car.position.copy()
new_vel = self.car.velocity.copy()
# -- 衝突判定 ----------------------------
if self.isCrash(pos=new_pos):
# -- 位置のみ前時刻に戻す --------
self.car.set_pos(old_pos.copy())
# -- はねかえす ------------------
self.car.set_pos(old_pos - (1. / 2.) * old_vel * self.car.deltaT)
# -- 報酬の支払い ----------------
self.car.reward = self.REWARD_CRASH
# debuglog(s='reward:%0.1f' % self.car.reward)
# -- センサ値の更新 ----------------------
# TODO: センサーは配列にすべき
self.car.sencerF_val = self.calc_sencer_value(self.car.sencerF, self.car.position)
self.car.sencerR_val = self.calc_sencer_value(self.car.sencerR, self.car.position)
self.car.sencerL_val = self.calc_sencer_value(self.car.sencerL, self.car.position)
def calc_sencer_value(self, sencer_vec, car_pos):
"""
センサ値の計算
:param sencer: (ndarry) センサの方向ベクトル
:return: センサ値 {0~1}={1:reso}/reso
"""
sencer_value = 0
# -- センサ値の衝突判定 ------------------
# for i in range(self.SENCER_RESOLUTION, -1, -1):
# ratio = i / self.SENCER_RESOLUTION
# pos = (sencer_vec * ratio) + car_pos
#
# if not self.isCrash(pos):
# sencer_value = i
# break
# -- センサ値の衝突判定 ------------------
# 計算負荷がかかるが、試作のため、自車位置から検索
# TODO: ソート方式に変更
for i in range(self.SENCER_RESOLUTION):
ratio = float(i) / self.SENCER_RESOLUTION
sencer_pos = (sencer_vec * ratio) + car_pos
if self.isCrash(sencer_pos):
return i
return sencer_value
def isCrash(self, pos):
"""
車が壁に衝突したか判定
:param pos: (ndarry) 位置
:return: (bool) 衝突:true, 以外:false
"""
x, y = [0, 1]
xlim_min, xlim_max = self.xlim
ylim_min, ylim_max = self.ylim
if pos[x] < xlim_min or xlim_max < pos[x]:
return True
if pos[y] < ylim_min or ylim_max < pos[y]:
return True
return False
pass
ACTION = ['left', 'forward', 'right']
STERR_LEVEL = 1
SPEED = 1
class CircleRaceOfQlean(CircleRace):
def get_state(self):
"""
Q学習用に環境値を返す関数
:return:
"""
sencers = []
sencers.append(self.car.sencerL_val)
sencers.append(self.car.sencerF_val)
sencers.append(self.car.sencerR_val)
return np.asarray(sencers)
def get_next_state(self, action):
"""
Q学習用。与えられたアクションから状態を更新
:param action:
:return:
"""
self.update_game(steer=ACTION[action], steer_level=STERR_LEVEL, speed=SPEED)
r = self.car.reward
return self.get_state(), r
def encodeStateToO(self, S):
o = np.asarray(S).copy()
return o
class Agent(object):
""" ゲームルールによらない汎用性を持たす
action: パターンの数だけ保持
学習アルゴリズム: Q学習
a = getNextAction(s)
lean(S,a,r,S_next)
"""
ALPHA = 0.9
GAMMA = 0.9
DATASET_NUMBER = 1000
LEAN_EPOCHS = 100
LEAN_RATE = 0.2
GREEDY_RATIO = 0.2
def __init__(self, _numInput=3, numAction=3):
self.action_paturn = range(numAction)
self.learningObj = MultiLayerPerceptron(numInput=_numInput, numHidden=5, numOutput=numAction, activate1="tanh",
activate2="sigmoid")
self.X = []
self.Y = []
self.learnFlg = True
def learn(self, o, a, r, o_next):
"""Q学習 or NeuralNetworkを使って,Q値を学習"""
dQs = self.learningObj.predict(o)
qk = dQs[a]
maxQ = np.max(dQs)
dQs[a] = qk + self.ALPHA * (r + self.GAMMA * maxQ - qk)
self.X.append(np.asarray(o))
self.Y.append(np.asarray(dQs))
if len(self.X) > self.DATASET_NUMBER:
self.X.pop(0)
self.Y.pop(0)
err = self.learningObj.fit(np.asarray(self.X), np.asarray(self.Y), learning_rate=self.LEAN_RATE,
epochs=self.LEAN_EPOCHS)
return err
def getNextAction(self, o):
a = None
# 最大Q値の行動選択
# 観測(observe)から、NNでQ値(配列)を取得
Q_t = self.learningObj.predict(o)
maxQt_idx = np.argmax(Q_t)
best_actions = maxQt_idx
try:
a = np.random.choice(best_actions)
except:
print 'Q_t', Q_t
print 'best_actions', best_actions
print 'ERROR'
a = best_actions
print 'a', a
if self.learnFlg:
return a
# greedyの行動選択
if GREEDY_RATIO < random.random():
return a
else:
return np.random.choice(range(self.action_paturn))
def mainAlgorism():
""" QNeuralNetwork
+ state<-CircleRace
+ agent<-QNeuralNetwork
+ iterate_step = {0}
-----------------------------
: mainloop()
- {s0} = state.get_init_state()
- {a} = agent.get_next_action(s0)
- {s1,R} = state.get_next_state(a)
- {} = agent.learn(s0,a,R,s1}
- iterate_step += 1
"""
NN_ALPHA = 0.9
NN_GAMMA = 0.9
state = CircleRaceOfQlean(area_width=300, area_height=500)
agent = Agent(numInput=3, numHidden=5, numOutput=3, activate1="tanh", activate2="sigmoid")
class mainAlgorism():
def __init__(self):
self.agent = Agent(_numInput=3,numAction=3)
self.state = CircleRaceOfQlean(area_width=300, area_height=500)
self.lean_flg = True
# self.agent.setLearnFlg(self.learn_flg)
for i in range(1999):
self.update()
def update(self):
# 初期設定
# -----------------------------
S = self.state.get_state()
o = self.state.encodeStateToO(S)
# 1. エージェントは環境から受け取った観測Sを受け取り、
# 方策planに基づいて環境に行動aを渡す
# -----------------------------
a = self.agent.getNextAction(np.copy(o))
# 2. 環境StateはエージェントAgentから受け取った行動aと、
# 現在の状態Sにもとづいて、次の状態S'を返却
# -----------------------------
S_next, r = self.state.get_next_state(a)
o_next = self.state.encodeStateToO(np.copy(S_next))
# 3. Agentに学習させる
# -----------------------------
err = self.agent.learn(np.copy(o), a, r, np.copy(o_next))
print 'err', err, a, r
def mainGUI():
app = QApplication(sys.argv)
win = CircleRaceWindow()
win.show()
sys.exit(app.exec_())
if __name__ == "__main__":
# print(CircleRace.__doc__)
# mainGUI()
mainAlgorism()
#! coding:utf-8
"""
MultiLayerPerceptron.py
## Description
多層パーセプトロンの高速化
http://aidiary.hatenablog.com/entry/20140201/1391218771
入力層 - 隠れ層 - 出力層の3層構造で固定(PRMLではこれを2層と呼んでいる)
隠れ層の活性化関数にはtanh関数またはsigmoid logistic関数が使える
出力層の活性化関数にはtanh関数、sigmoid logistic関数、恒等関数が使える
## コメント
前回、3層以上の多層パーセプトロンを実装しようとしたが失敗。
隠れ層を含んだ、各レイヤーをnumpy配列として、
一つのリストに格納していたが、どうも型がおかしくなって上手くいかない。
おそらく、各レイヤーはオブジェクトとして実装したほうがよい。
今回は、基本に戻り3層パーセプトロンを実装する。
参考コードは関数化されており、拡張性を秘めているので利用する。
ただし、拡張性については、拡張するのはいいが、
いずれは機械学習ライブラリを利用していくほうがよい。
現状は3層パーセプトロンを実装し、QLearningとの統合を目指す。
Created by fifi (2015/11/24 5:54)
"""
__version__ = '1.1'
import numpy as np
import sys
# numpy配列をプリントした場合の表示桁数
np.set_printoptions(precision=3)
def ndprint(a, format_string='{0:.2f}'):
"""
ndarrayをprintする関数
:example: ndprint(x)
"""
return [format_string.format(v, i) for i, v in enumerate(a)]
def ndprints(s, a, format_string='{0:.2f}'):
"""
ndarrayをprintする関数
:example: ndprint(x)
"""
print s, [format_string.format(v, i) for i, v in enumerate(a)]
############################################
#
# サブ関数群
#
############################################
def tanh(x):
"""
tanh(x)
:param x:
"""
return np.tanh(x)
def tanh_deriv(x):
"""
tanhの微分. tanh'(x)
tanh'(x)=1 - tanh(x)**2
:param x: tanh(x)
"""
return 1.0 - x ** 2
def sigmoid(x):
"""
シグモイド関数
"""
return 1. / (1. + np.exp(-x))
def sigmoid_deriv(x):
"""
シグモイド関数の導関数.
sigmoid'(x) = sigmoid(x) * (1-sigmoid(x))
:param x: sigmoid(x)
"""
return x * (1 - x)
def identity(x):
"""
線形関数.
"""
return x
def identity_deriv(x):
"""
線形関数の微分
:param x:
"""
return 1
############################################
#
# 多層パーセプトロンクラス
#
############################################
class MultiLayerPerceptron:
def __init__(self, numInput=3, numHidden=5, numOutput=1, activate1="tanh", activate2="sigmoid"):
"""
多層パーセプトロンを初期化
:param numInput: 入力層のユニット数(バイアスユニットは除く)
:param numHidden: 隠れ層のユニット数(バイアスユニットは除く)
:param numOutput: 出力層のユニット数
:param act1: 隠れ層の活性化関数 ( tanh or sigmoid )
:param act2: 出力層の活性化関数 ( tanh or sigmoid or identity )
"""
# 引数の指定に合わせて隠れ層の活性化関数とその微分関数を設定
# ----------------------------------------------------------
if activate1 == "tanh":
self.act1 = tanh
self.act1_deriv = tanh_deriv
elif activate1 == "sigmoid":
self.act1 = sigmoid
self.act1_deriv = sigmoid_deriv
else:
print "ERROR: act1 is tanh or sigmoid"
sys.exit()
print ">> Hidden Layer Activation Function is : %s" % activate1
# 引数の指定に合わせて、出力層の活性化関数とその微分関数を設定
# ----------------------------------------------------------
if activate2 == "tanh":
self.act2 = tanh
self.act2_deriv = tanh_deriv
elif activate2 == "sigmoid":
self.act2 = sigmoid
self.act2_deriv = sigmoid_deriv
elif activate2 == "identity":
self.act2 = identity
self.act2_deriv = identity_deriv
else:
print "ERROR: act2 is tanh or sigmoid or identity"
sys.exit()
print ">> Outpu Layer Activation Function is : %s" % activate2
# 各レイヤのユニット数を格納(バイアスユニットがあるので入力層と隠れ層は+1)
# ------------------------------------------------------------------------
self.numInput = numInput + 1
self.numHidden = numHidden + 1
self.numOutput = numOutput
# 重みを(-1.0, 1.0)の一様乱数で初期化
# ------------------------------------
# -- 入力層 - 隠れ層
self.weight1 = np.random.uniform(
-1.0,
1.0,
(self.numHidden, self.numInput)
)
# -- 隠れ層 - 出力層
self.weight2 = np.random.uniform(
-1.0,
1.0,
(self.numOutput, self.numHidden)
)
# print self.weight1
# print self.weight2
def fit(self, X, t, learning_rate=0.2, epochs=10000):
"""
訓練データを用いてネットワークの重みを更新する
:param X: 入力データ
:param t: 教師データ
:param learning_rate: 学習率
:param epochs: 更新回数
"""
# バイアスユニットを追加
X = np.hstack([np.ones([X.shape[0], 1]), X])
# 教師データ
t = np.array(t)
# 誤差
err = 0
# 逐次学習
# (訓練データからランダムサンプリングして重みを更新。epochs回繰り返す)
# ---------------------------------------------------------------------
for k in range(epochs):
if k % 1000 == 0:
# print '>Epochs:%d' % k
pass
# 訓練データからランダムにサンプルを選択
# --------------------------------------
i = np.random.randint(X.shape[0])
x = X[i]
# 順伝播
# ======
# 入力を順伝播させて中間層の出力を計算
# -------------------------------------
z = self.act1(np.dot(self.weight1, x))
# 中間層の出力を順伝播させて出力層の出力を計算
# --------------------------------------------
y = self.act2(np.dot(self.weight2, z))
# 出力層の誤差を計算
# ==================
# ** WARNING **
# PRMLによると出力層の活性化関数にどれを用いても
# (y - t[i])でよいと書いてあるが
# 下のように出力層の活性化関数の微分もかけたほうが精度がずっと良くなる
# 教師データと出力ユニットとの誤差を計算
# ---------------------------------------
delta2 = self.act2_deriv(y) * (y - t[i])
# 出力層の誤差を逆伝播させて隠れ層の誤差を計算
# -------------------------------------------
delta1 = self.act1_deriv(z) * np.dot(self.weight2.T, delta2)
# 重みの更新
# ==========
# (行列演算になるので2次元ベクトルに変換する必要がある)
x = np.atleast_2d(x)
delta1 = np.atleast_2d(delta1)
# 隠れ層の誤差を用いて隠れ層の重みを更新
# -----------------------------------------------------
# self.weight1 = self.weight1 - learning_rate * np.dot(delta1.T, x)
self.weight1 -= learning_rate * np.dot(delta1.T, x)
# (行列演算になるので2次元ベクトルに変換する必要がある)
z = np.atleast_2d(z)
delta2 = np.atleast_2d(delta2)
# 出力層の誤差を用いて、出力層の重みを更新
# -----------------------------------------
# self.weight2 = self.weight2 - learning_rate * np.dot(delta2.T, z)
self.weight2 -= learning_rate * np.dot(delta2.T, z)
err += delta2
return np.sum(err)
def predict(self, x0):
"""
テストデータの出力を予測し、教師と比較
:param x:
"""
x = np.asarray(x0)
# バイアスの1を追加
x = np.insert(x, 0, 1)
# 順伝播によりネットワークの出力を計算
z = self.act1(np.dot(self.weight1, x))
y = self.act2(np.dot(self.weight2, z))
return y
if __name__ == '__main__':
mpl = MultiLayerPerceptron(numInput=2, numHidden=5, numOutput=1, activate1="tanh", activate2="sigmoid")
X = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
y = np.array([0, 1, 1, 0])
mpl.fit(X, y)
for x, y in zip(X, y):
print 'X:%s, y:%0.2f, pred:%0.2f' % (ndprint(x), y, mpl.predict(x))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment