Last active
December 22, 2018 14:10
-
-
Save darden1/6b91c0792530c4b550806315c0bd2808 to your computer and use it in GitHub Desktop.
my_multi_layer_perceptron.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class MultiLayerPerceptron(): | |
| def __init__(self, hidden_layer_sizes=(10,), activation=“relu”, random_state=0): | |
| self.init_state = True # 重みの初期化判定フラグ | |
| self.dense = None # 全結合インスタンスリスト | |
| self.act_func = None # 活性化関数インスタンスリスト | |
| self.n_units = None # 各レイヤーのユニット数リスト | |
| self.loss = None # トレーニングデータのLoss | |
| self.val_loss = None # テストデータのLoss | |
| self.acc = None # トレーニングデータの正答率 | |
| self.val_acc = None # テストデータの正答率 | |
| self.hidden_layer_sizes = hidden_layer_sizes # 隠れ層のサイズ | |
| self.n_layers = len(self.hidden_layer_sizes) + 2 # 全レイヤーの数 | |
| self.activation = activation # 活性化関数の名前 | |
| np.random.seed(random_state) # 乱数シード | |
| def fit(self, X, Y, batch_size, epochs, mu, validation_data, verbose): | |
| """ | |
| # 学習の実施 | |
| ## 引数の説明 | |
| - X: トレーニングサンプル | |
| - Y: その教師データ | |
| - batch_size: ミニバッチのサイズ | |
| - epochs: エポック数 | |
| - mu: 学習率 | |
| - validation_data: テストデータ。(X_test, y_test)のように。タプル形式で渡す。 | |
| - verbose: 学習ログを出力するかどうか。0で出力しない。それ以外で出力する。 | |
| """ | |
| # サンプル数、特徴量数、クラス数 | |
| n_samples, n_features = X.shape | |
| n_classes = Y.shape[1] | |
| # テストデータ | |
| val_X = validation_data[0] | |
| val_Y = validation_data[1] | |
| # 重み&学習ログ初期化 | |
| if self.init_state: | |
| self.initialize_history() | |
| self.create_layers(n_features, n_classes) | |
| self.init_state = False | |
| # エポックループ | |
| for i_epoch in range(epochs): | |
| # トレーニング&テストデータのLossと正答率を保存 | |
| self.record_history(X, Y, val_X, val_Y) | |
| # ミニバッチを回す回数 | |
| n_batch = int(np.floor(n_samples/batch_size)) | |
| # ミニバッチループ | |
| for i_batch in range(n_batch): | |
| # トレーニングデータをバッチサイズ数分切り取る(端数があるので最後のループは以降全部選択) | |
| X_batch = X[batch_size*i_batch:batch_size*(i_batch+1+int(i_batch==n_batch-1)), :] | |
| Y_batch = Y[batch_size*i_batch:batch_size*(i_batch+1+int(i_batch==n_batch-1)), :] | |
| # バックプロパゲーション | |
| self.back_prop(X_batch, Y_batch) | |
| # バッチのサンプル数 | |
| n_samples_batch = X_batch.shape[0] | |
| # 重み・閾値アップデート | |
| self.update_weights(mu, n_samples_batch) | |
| # 学習ログ出力 | |
| if verbose!=0: | |
| self.print_latest_history(i_epoch, epochs) | |
| def create_layers(self, n_features, n_classes): | |
| """レイヤー作成""" | |
| # リスト初期化 | |
| self.dense = [] | |
| self.act_func = [] | |
| self.n_units = [] | |
| self.n_units.append(n_features) | |
| for i_layer, units in enumerate(self.hidden_layer_sizes): | |
| self.dense.append(Dense(units=units, input_dim=self.n_units[i_layer])) | |
| self.act_func.append(Activation(name=self.activation)) | |
| self.n_units.append(units) | |
| # 最後のレイヤー | |
| self.dense.append(Dense(units=n_classes, input_dim=self.n_units[-1])) | |
| self.act_func.append(SoftMax()) | |
| self.n_units.append(n_classes) | |
| def forward_prop(self, X): | |
| """順伝播演算""" | |
| Phi = [np.array([])]*self.n_layers | |
| Z = [np.array([])]*(self.n_layers-1) | |
| for i_layer in range(self.n_layers-1): | |
| if i_layer==0: | |
| Phi[i_layer] = X | |
| Z[i_layer] = self.dense[i_layer].forward_prop(Phi[i_layer]) | |
| Phi[i_layer+1] = self.act_func[i_layer].forward_prop(Z[i_layer]) | |
| return Z, Phi # Lossを計算する時にPhi[-1]が必要なので返すようにする | |
| def back_prop(self, X, Y): | |
| """逆伝播演算""" | |
| Z, Phi = self.forward_prop(X) | |
| Delta = [np.array([])]*(self.n_layers-1) | |
| dPhi = [np.array([])]*(self.n_layers-1) | |
| for i_layer in range(self.n_layers-2, -1, -1): | |
| if i_layer == self.n_layers-2: | |
| Delta[i_layer] = self.act_func[i_layer].back_prop(Z[i_layer], Y) | |
| else: | |
| Delta[i_layer] = self.act_func[i_layer].back_prop(Z[i_layer], dPhi[i_layer+1]) | |
| dPhi[i_layer] = self.dense[i_layer].back_prop(Phi[i_layer], Delta[i_layer]) | |
| def update_weights(self, mu, n_samples): | |
| """重み・閾値アップデート""" | |
| for d in self.dense: | |
| d.update_weights(mu, n_samples) | |
| def closs_entropy(self, X, Y): | |
| """交差エントロピー""" | |
| Z, Phi = self.forward_prop(X) | |
| n_samples = Y.shape[0] | |
| return -np.sum(Y*np.log(Phi[-1])) / n_samples # サンプル数で割って1サンプル当たりの平均値にする | |
| def initialize_history(self): | |
| """学習ログ初期化""" | |
| self.loss = np.array([]) | |
| self.val_loss = np.array([]) | |
| self.acc = np.array([]) | |
| self.val_acc = np.array([]) | |
| def record_history(self, X, Y, val_X, val_Y): | |
| """トレーニング&テストデータのLossと正答率を保存""" | |
| self.loss = np.append(self.loss, self.closs_entropy(X, Y)) | |
| self.val_loss = np.append(self.val_loss, self.closs_entropy(val_X, val_Y)) | |
| self.acc = np.append(self.acc, self.accuracy_score(Y, self.predict(X))) | |
| self.val_acc = np.append(self.val_acc, self.accuracy_score(val_Y, self.predict(val_X))) | |
| def print_latest_history(self, i_epoch, epochs): | |
| """学習ログ(ヒストリーデータの最終値)を出力""" | |
| print(“Epoch “ + str(i_epoch+1) + “/“ + str(epochs) + \ | |
| “ loss: “ + str(self.loss[-1]) + \ | |
| “ acc: “ + str(self.acc[-1]) + \ | |
| “ val_loss: “ + str(self.val_loss[-1]) + \ | |
| “ val_acc: “ + str(self.val_acc[-1])) | |
| def predict(self, X): | |
| """予測実施関数""" | |
| Z, Phi = self.forward_prop(X) | |
| class_label = np.argmax(Phi[-1], axis=1) | |
| Y_pred = np.zeros_like(Phi[-1]) | |
| for i in range(len(Y_pred)): | |
| Y_pred[i, class_label[i]] = 1 | |
| return Y_pred | |
| def accuracy_score(self, Y_true, Y_pred): | |
| """正答率算出関数""" | |
| acc = np.array([np.sum(Y_true[i,:]==Y_pred[i,:])==len(Y_true[i,:]) for i in range(len(Y_true))]) | |
| return np.sum(acc)/len(acc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment