Last active
May 9, 2018 00:30
-
-
Save ven-kyoshiro/a6c2edddbabc48bd4e353f604c9136d7 to your computer and use it in GitHub Desktop.
Adaboostの実装ですが,Uを自由に変更できます.(参考文献:情報学習論講義資料)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf-8 -*- | |
import numpy as np | |
import matplotlib | |
matplotlib.use('Agg') | |
import matplotlib.pyplot as plt | |
import scipy.optimize as optimize | |
from multiprocessing import Pool | |
import os | |
from tqdm import tqdm | |
import multiprocessing as multi | |
from mpl_toolkits.mplot3d import Axes3D | |
# Uは凸で単調増加な任意の関数で指定できます | |
def U(x): | |
return np.exp(x) | |
# Uの導関数を定義 | |
def deri_U(x): | |
return np.exp(x) | |
# 真の判別境界を定義 | |
def true_boundary(x,y): | |
return x > 0.2 * np.sin(y*2*np.pi)+0.5 | |
# データ生成の時ラベルを振り分ける | |
def coloring(x,y): | |
if true_boundary(x,y): | |
return 'red' | |
else: | |
return 'blue' | |
def make_data(N): | |
data = np.random.rand(N,2) | |
color = [coloring(d[0],d[1]) for d in data] | |
label = [ 2*float(c=='red')-1 for c in color] | |
return data, color, label | |
def draw_GT(data,color): | |
fig = plt.figure() | |
ax = fig.add_subplot(1,1,1) | |
ax.scatter(data.T[0],data.T[1],color = color) | |
ax.set_title('Experience distribution (GT)') | |
ax.set_xlabel('x') | |
ax.set_ylabel('y') | |
plt.savefig("GT.png",format = 'png', dpi=300) | |
''' | |
x,yを-1,+1に分類する関数 | |
no_use は使用しないa(=0),b(=1),c(=2),d(=3),の番号 | |
+1を返すのは,x<a ∩ x>by ∩ <c (∩ y>d) の時, | |
''' | |
def h(x,y,no_use,ths): | |
# 早くflaseを見つけておくりかえそう | |
count = 0 | |
if no_use != 0: | |
if ths[count]<x: | |
return -1. | |
count+=1 | |
if no_use != 1: | |
if ths[count]>x: | |
return -1. | |
count+=1 | |
if no_use != 2: | |
if ths[count]<y: | |
return -1. | |
count+=1 | |
if no_use != 3: | |
if ths[count]>y: | |
return -1. | |
return 1. | |
def h_color(x,y,no_use,ths): | |
if h(x,y,no_use,ths) == 1.0: | |
return 'red' | |
else: | |
return 'blue' | |
def eps(pred,X,D): | |
return -np.dot(pred*X,D) | |
# 仮説を全探索 | |
def argmin_h(args): | |
label, D, data = args | |
min_h = {} | |
min_eps = 100000000 # 大きい数 | |
for k in tqdm(range(4)): | |
with Pool(multi.cpu_count()) as p: | |
result = p.map(argmin_h_process, [[k,label,D,data,th1] for th1 in np.linspace(0, 1, 47)]) | |
for rs in result: | |
if rs[0] < min_eps: | |
min_eps = rs[0] | |
min_h = rs[1] | |
return min_eps, min_h | |
def argmin_h_process(args): | |
no_use, label, D, data, th1 = args | |
min_eps = 1000000000. # 大きい数 | |
min_h = {} | |
for th2 in np.linspace(0,1,47): | |
for th3 in np.linspace(0,1,47): | |
pred = np.array([h(d[0],d[1],no_use,[th1,th2,th3]) for d in data]) | |
if min_eps > eps(pred,label,D): | |
min_eps = eps(pred,label,D) | |
min_h = {'no_use':no_use,'ths':[th1,th2,th3]} | |
return min_eps, min_h | |
# 一番良いalphaを探す | |
def eval_alpha(alpha,args): | |
F,min_h,data,label = args | |
N = len(data) | |
evalation = U(0)*N | |
for d,l in zip(data,label): | |
cont = 0.# ラベルが一致する方の項 | |
# ラベル異なる時,誤ったラベルを入れた時ー正しいラベルを入れた時 | |
# つまり, | |
''' | |
T F | |
P -1 1 | |
N 0 0 | |
この対角が最小となる | |
多分この実装であってると思うけど要確認 | |
特にF = fα+...+fα で...rawの方使うのであってる...と思う | |
''' | |
cont-= l*F.raw_predict(d[0],d[1]) | |
cont-= alpha*l*h(d[0],d[1],min_h['no_use'],min_h['ths']) | |
evalation+=U(cont) | |
return evalation | |
def draw(data,label,F,D,num): | |
# 判別境界を見てみる | |
corrects = {'data':[],'color':[],'size':[]} | |
mistakes = {'data':[],'color':[],'size':[]} | |
for datum,l,d in zip(data,label,D): | |
if l*F.predict(datum[0],datum[1]) == 1.0: | |
corrects['data'].append(datum) | |
corrects['color'].append(F.coloring(datum[0],datum[1])) | |
corrects['size'].append(max(int(2000.* d),1)) | |
else: | |
mistakes['data'].append(datum) | |
mistakes['color'].append(F.coloring(datum[0],datum[1])) | |
mistakes['size'].append(max(int(2000.* d),1)) | |
corrects['data'] = np.array(corrects['data']) | |
mistakes['data'] = np.array(mistakes['data']) | |
fig = plt.figure() | |
ax = fig.add_subplot(1,1,1) | |
ax.scatter(corrects['data'].T[0],corrects['data'].T[1],color = corrects['color'],s = corrects['size'],marker='o',alpha = 0.4) | |
ax.scatter(mistakes['data'].T[0],mistakes['data'].T[1],color = mistakes['color'],s = mistakes['size'],marker='x',alpha = 0.6) | |
ax.set_title('Experience distribution:{0:02d}'.format(num)) | |
ax.set_xlabel('x') | |
ax.set_ylabel('y') | |
plt.savefig("itr{0:02d}.png".format(num),format = 'png', dpi=300) | |
return len(mistakes['data']) | |
# アンサンブル学習器のクラス | |
class integrated_F: | |
def __init__(self): | |
self.alpha=[1.0] | |
self.hpsy = [{'no_use':3,'ths':[0.,1.,1.]}] # 変域が矛盾してるので必ずフォルス | |
def predict(self,x,y): | |
vote = self.raw_predict(x,y) | |
if vote>0: | |
return 1. | |
else: | |
return -1. | |
def raw_predict(self,x,y): | |
vote = 0. | |
for alp, hyp in zip(self.alpha,self.hpsy): | |
vote+=alp*h(x,y,hyp['no_use'],hyp['ths']) | |
return vote | |
def update(self,new_alp,new_hyp): | |
self.alpha.append(new_alp) | |
self.hpsy.append(new_hyp) | |
def coloring(self,x,y): | |
vote = self.raw_predict(x,y) | |
if vote>0: | |
return 'red' | |
else: | |
return 'blue' | |
# 評価 | |
def assess(F,summary): | |
# GTをプリント | |
x = np.arange(0., 1.0, 0.01) | |
y = np.arange(0., 1.0, 0.01) | |
X, Y = np.meshgrid(x, y) | |
Z = np.array([[1.0 if true_boundary(xx,yy) else -1.0 for xx in x] for yy in y]) | |
fig = plt.figure() | |
ax = Axes3D(fig) | |
ax.set_zlim(-2, 2) | |
ax.set_xlabel("x1") | |
ax.set_ylabel("x2") | |
ax.set_zlabel("GT(x1,x2)") | |
ax.plot_wireframe(X, Y, Z) | |
ax.set_title('Grand Truth') | |
plt.savefig('Grand_Truth.png',format = 'png', dpi=300) | |
# GTをプリント | |
x = np.arange(0., 1.0, 0.05) | |
y = np.arange(0., 1.0, 0.05) | |
X, Y = np.meshgrid(x, y) | |
for i in range(len(F.hpsy)): | |
Z = np.array([[F.alpha[i]*h(yy,xx,F.hpsy[i]['no_use'],F.hpsy[i]['ths']) for xx in x] for yy in y]) | |
fig = plt.figure() | |
ax = Axes3D(fig) | |
ax.set_zlim(-2, 2) | |
ax.set_xlabel("x1") | |
ax.set_ylabel("x2") | |
ax.set_zlabel("h(x1,x2)") | |
ax.plot_wireframe(X, Y, Z ,cmap='jet') | |
ax.set_title('week_learner{0:02d}_alpha={1}'.format(i,round(F.alpha[i],3))) | |
plt.savefig('week_learner{0:02d}.png'.format(i),format = 'png', dpi=300) | |
fig = plt.figure() | |
ax = fig.add_subplot(1,1,1) | |
ax.set_title('count_of_mistakes_in_train_data') | |
ax.set_xlabel('number of learners') | |
ax.set_ylabel('mistakes') | |
left = np.array(range(len(summary))) | |
height = np.array(summary) | |
plt.plot(left, height) | |
plt.savefig('count_of_mistakes_in_train_data.png',format = 'png', dpi=300) | |
def main(): | |
N=500 # データサイズ | |
np.random.seed(42) | |
summary = [] | |
# データ作り | |
data, color, label = make_data(N) | |
# 正解データの真の分布をプロット | |
draw_GT(data,color) | |
# データに対応させる重みDを一様分布で初期化 | |
D = np.array([1.]*N) | |
F = integrated_F() | |
# argmin_epsの仮説を所得 | |
min_eps, min_h = argmin_h([label, D, data]) | |
# 適切なαをblent法で探索 | |
min_alpha = optimize.minimize_scalar(eval_alpha,args = [F,min_h,data,label])['x'] | |
# Fの更新 | |
F.update(min_alpha,min_h) | |
# 最後にDを更新する | |
D = np.array([deri_U(l*F.raw_predict(d[0],d[1])*-1) for l,d in zip(label,data)]) | |
D = D/D.sum() | |
summary.append(draw(data,label,F,D,0)) | |
# イテレーション | |
for i in range(1,20): | |
print('-------it_num:{0}-------'.format(i)) | |
min_eps, min_h = argmin_h([label, D, data]) | |
min_alpha = optimize.minimize_scalar(eval_alpha,args = [F,min_h,data,label])['x'] | |
F.update(min_alpha,min_h) | |
D = np.array([deri_U(l*F.raw_predict(d[0],d[1])*-1) for l,d in zip(label,data)]) | |
D = D/D.sum() | |
summary.append(draw(data,label,F,D,i+1)) | |
# 学習器の仮説集合と学習過程を描画 | |
assess(F,summary) | |
if __name__ =='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
結果
弱学習器も見れます