Last active
May 29, 2019 12:26
-
-
Save ground0state/87e70876bdf160553c8ec076c1dfc644 to your computer and use it in GitHub Desktop.
TensorFlowの基礎の基礎的な書き方
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import tensorflow as tf | |
from sklearn import datasets | |
from sklearn.model_selection import train_test_split | |
from sklearn.utils import shuffle | |
import matplotlib.pyplot as plt | |
# シードを設定 | |
np.random.seed(0) | |
tf.set_random_seed(1234) | |
class DNN(object): | |
"""ニューラルネットワーク.""" | |
def __init__(self, n_in, n_hiddens, n_out): | |
"""コンストラクタ. | |
Parameters | |
---------- | |
n_in : int | |
入力層の次元. | |
n_hiddens : list | |
隠れ層それぞれの次元を格納したリスト. | |
e.g. [4,7,3]. | |
n_out : int | |
出力層の次元. | |
""" | |
self.n_in = n_in | |
self.n_hiddens = n_hiddens | |
self.n_out = n_out | |
# 各層の重みとバイアスを格納する | |
self.weights = [] | |
self.biases = [] | |
self._x = None | |
self._y = None | |
self._t = None | |
# ドロップアウト確率 | |
self._rate = None | |
self._sess = None | |
self._history = { | |
'accuracy': [], | |
'loss': [], | |
'val_loss': [] | |
} | |
def _weight_variable(self, shape): | |
"""重み変数を初期化する. | |
Parameters | |
---------- | |
shape : list | |
重み行列のshape. | |
e.g. shape=[2, 2]. | |
Return | |
---------- | |
tf.Variable | |
初期化済みの重み変数. | |
""" | |
# 重みはなんらかの分布を用いて初期化する | |
initial = tf.truncated_normal(shape, stddev=0.01) | |
return tf.Variable(initial) | |
def _bias_variable(self, shape): | |
"""バイアスを初期化する. | |
Parameters | |
---------- | |
shape : list | |
バイアスのshape. | |
e.g. shape=[10]. | |
Return | |
---------- | |
tf.Variable | |
バイアス. | |
""" | |
# バイアスは0で初期化する | |
initial = tf.zeros(shape) | |
return tf.Variable(initial) | |
def _inference(self, x, rate): | |
"""推論. | |
Parameters | |
---------- | |
x : tf.Tensor | |
説明変数. | |
rate : tf.Tensor | |
ドロップアウト率. | |
Returns | |
------- | |
tf.Tensor | |
順伝搬の出力. | |
""" | |
def batch_normalization(shape, x): | |
eps = 1e-8 | |
beta = tf.Variable(tf.zeros(shape)) | |
gamma = tf.Variable(tf.ones(shape)) | |
mean, var = tf.nn.moments(x, [0]) | |
return gamma * (x - mean) / tf.sqrt(var + eps) + beta | |
# 入力層 - 隠れ層、隠れ層 - 隠れ層 | |
for i, n_hidden in enumerate(self.n_hiddens): | |
if i == 0: | |
# 入力層の処理 | |
input = x | |
input_dim = self.n_in | |
else: | |
input = output | |
input_dim = self.n_hiddens[i-1] | |
# 各層の重みとバイアスを格納する | |
self.weights.append(self._weight_variable([input_dim, n_hidden])) | |
self.biases.append(self._bias_variable([n_hidden])) | |
# 活性化 | |
h = tf.nn.relu(tf.matmul( | |
input, self.weights[-1]) + self.biases[-1]) | |
# ドロップアウト処理 | |
output = tf.nn.dropout(h, rate=rate) | |
# 隠れ層 - 出力層 | |
self.weights.append( | |
self._weight_variable([self.n_hiddens[-1], self.n_out])) | |
self.biases.append(self._bias_variable([self.n_out])) | |
# 出力層の活性化 | |
y = tf.nn.softmax(tf.matmul( | |
output, self.weights[-1]) + self.biases[-1]) | |
return y | |
def _loss(self, y, t): | |
"""損失. | |
Parameters | |
---------- | |
y : tf.Tensor | |
順伝搬の出力. | |
t : tf.Tensor | |
正解の値. | |
Returns | |
------- | |
tf.Tensor | |
損失. | |
""" | |
# 交差エントロピーを使用 | |
cross_entropy = tf.reduce_mean(-tf.reduce_sum(t * | |
tf.log(tf.clip_by_value(y, 1e-10, 1.0)), axis=1)) | |
return cross_entropy | |
def _training(self, loss): | |
"""学習. | |
Parameters | |
---------- | |
loss : tf.Tensor | |
損失. | |
Returns | |
------- | |
tf.Operation | |
オペレーションクラス. | |
""" | |
# 最適化手法としてSGDを使用 | |
optimizer = tf.train.GradientDescentOptimizer(0.01) | |
# オペレーションクラスを作成 | |
train_step = optimizer.minimize(loss) | |
return train_step | |
def _accuracy(self, y, t): | |
"""正解率. | |
Parameters | |
---------- | |
y : tf.Tensor | |
順伝搬の出力. | |
t : tf.Tensor | |
正解の値. | |
Returns | |
------- | |
tf.Tensor | |
正解率. | |
""" | |
# 確率が最も高い列インデックスを取得し、等しければTrue | |
correct_prediction = tf.equal( | |
tf.argmax(y, axis=1), tf.argmax(t, axis=1)) | |
# Trueの割合を計算 | |
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) | |
return accuracy | |
def fit(self, X_train, Y_train, | |
nb_epoch=100, batch_size=100, drop_rate=0.5, | |
verbose=1, validation_data=None, callbacks=None): | |
"""訓練を実施. | |
Parameters | |
---------- | |
X_train : np.ndarray | |
説明変数. | |
Y_train : np.ndarray | |
目的変数. | |
nb_epoch : int | |
エポック数. | |
batch_size : int | |
バッチのサイズ. | |
drop_rate : float | |
ドロップアウト率. | |
verbose : boolean | |
trueのときはログを出力. | |
Returns | |
------- | |
dict | |
損失と正解率. | |
""" | |
# 入力変数のプレースホルダ | |
x = tf.placeholder(tf.float32, shape=[None, self.n_in]) | |
t = tf.placeholder(tf.float32, shape=[None, self.n_out]) | |
rate = tf.placeholder(tf.float32) | |
self._x = x | |
self._t = t | |
self._rate = rate | |
# 入力データを推論 | |
y = self._inference(x, rate) | |
# 損失を計算 | |
loss = self._loss(y, t) | |
# 損失から学習 | |
train_step = self._training(loss) | |
# 正解率を計算 | |
accuracy = self._accuracy(y, t) | |
# 作成したデータフローグラフを計算するセッションを構成 | |
init = tf.global_variables_initializer() | |
sess = tf.Session() | |
sess.run(init) | |
self._y = y | |
self._sess = sess | |
N_train = len(X_train) | |
n_batches = N_train // batch_size | |
for epoch in range(nb_epoch): | |
X_, Y_ = shuffle(X_train, Y_train) | |
# バッチで学習 | |
for i in range(n_batches): | |
start = i * batch_size | |
end = start + batch_size | |
sess.run(train_step, feed_dict={ | |
x: X_[start:end], | |
t: Y_[start:end], | |
rate: drop_rate | |
}) | |
loss_ = loss.eval(session=sess, feed_dict={ | |
x: X_train, | |
t: Y_train, | |
rate: 0.0 | |
}) | |
accuracy_ = accuracy.eval(session=sess, feed_dict={ | |
x: X_train, | |
t: Y_train, | |
rate: 0.0 | |
}) | |
# validationデータが有る場合はval_lossを計算 | |
val_loss_ = None | |
if validation_data is not None: | |
val_loss_ = loss.eval(session=sess, feed_dict={ | |
x: validation_data[0], | |
t: validation_data[1], | |
rate: 0.0 | |
}) | |
# エポックごとの損失と正解率を格納 | |
self._history['loss'].append(loss_) | |
self._history['accuracy'].append(accuracy_) | |
self._history['val_loss'].append(val_loss_) | |
# ログ出力レベル | |
if verbose: | |
print('epoch:', epoch, | |
' loss:', loss_, | |
' accuracy:', accuracy_, | |
' val_loss:', val_loss_) | |
# Early Stopping | |
if callbacks is not None: | |
if callbacks.validate(val_loss_): | |
break | |
return self._history | |
def evaluate(self, X_test, Y_test): | |
"""評価. | |
Parameters | |
---------- | |
X_test : np.ndarray | |
説明変数. | |
Y_test : np.ndarray | |
目的変数. | |
Returns | |
------- | |
np.ndarray | |
正解率. | |
""" | |
accuracy = self._accuracy(self._y, self._t) | |
return accuracy.eval(session=self._sess, feed_dict={ | |
self._x: X_test, | |
self._t: Y_test, | |
self._rate: 0.0 | |
}) | |
class EarlyStopping(): | |
"""連続で損失が減少しなかったとき訓練を終了する.""" | |
def __init__(self, patience=0, verbose=0): | |
"""コンストラクタ. | |
Parameters | |
---------- | |
patience : int | |
許容回数. | |
verbose : np.ndarray | |
ログ出力レベル. | |
""" | |
self._step = 0 | |
self._loss = float('inf') | |
self.patience = patience | |
self.verbose = verbose | |
def validate(self, loss): | |
"""損失を計算. | |
Parameters | |
---------- | |
loss : np.ndarray | |
損失. | |
Returns | |
---------- | |
bool | |
許容回数を超えたらtrue. | |
""" | |
if self._loss < loss: | |
self._step += 1 | |
if self._step > self.patience: | |
if self.verbose: | |
print('early stopping') | |
return True | |
else: | |
self._step = 0 | |
self._loss = loss | |
return False | |
if __name__ == '__main__': | |
"""データの生成""" | |
# mnist = datasets.fetch_mldata('MNIST original', data_home='.') | |
from sklearn.datasets import fetch_openml | |
mnist = fetch_openml('mnist_784', version=1,) | |
n = len(mnist.data) | |
N = 10000 # MNISTの一部を使う | |
indices = np.random.permutation(range(n))[:N] # ランダムにN枚を選択 | |
X = mnist.data[indices] | |
y = mnist.target[indices] | |
Y = np.eye(10)[y.astype(int)] # 1-of-K 表現に変換 | |
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, train_size=0.8) | |
X_train, X_valid, Y_train, Y_valid = train_test_split( | |
X_train, Y_train, train_size=0.8) | |
"""モデル設定""" | |
earlyStopping = EarlyStopping(patience=5) | |
model = DNN(n_in=len(X[0]), | |
n_hiddens=[200, 200, 200], | |
n_out=len(Y[0])) | |
"""モデル学習""" | |
model.fit(X_train, Y_train, | |
nb_epoch=100, | |
batch_size=200, | |
drop_rate=0.5, | |
verbose=1, | |
callbacks=earlyStopping, | |
validation_data=(X_valid, Y_valid)) | |
"""予測精度の評価""" | |
accuracy = model.evaluate(X_test, Y_test) | |
print('accuracy: ', accuracy) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment