Skip to content

Instantly share code, notes, and snippets.

@ground0state
Last active May 29, 2019 12:26
Show Gist options
  • Save ground0state/87e70876bdf160553c8ec076c1dfc644 to your computer and use it in GitHub Desktop.
Save ground0state/87e70876bdf160553c8ec076c1dfc644 to your computer and use it in GitHub Desktop.
TensorFlowの基礎の基礎的な書き方
import numpy as np
import tensorflow as tf
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
# シードを設定
np.random.seed(0)
tf.set_random_seed(1234)
class DNN(object):
"""ニューラルネットワーク."""
def __init__(self, n_in, n_hiddens, n_out):
"""コンストラクタ.
Parameters
----------
n_in : int
入力層の次元.
n_hiddens : list
隠れ層それぞれの次元を格納したリスト.
e.g. [4,7,3].
n_out : int
出力層の次元.
"""
self.n_in = n_in
self.n_hiddens = n_hiddens
self.n_out = n_out
# 各層の重みとバイアスを格納する
self.weights = []
self.biases = []
self._x = None
self._y = None
self._t = None
# ドロップアウト確率
self._rate = None
self._sess = None
self._history = {
'accuracy': [],
'loss': [],
'val_loss': []
}
def _weight_variable(self, shape):
"""重み変数を初期化する.
Parameters
----------
shape : list
重み行列のshape.
e.g. shape=[2, 2].
Return
----------
tf.Variable
初期化済みの重み変数.
"""
# 重みはなんらかの分布を用いて初期化する
initial = tf.truncated_normal(shape, stddev=0.01)
return tf.Variable(initial)
def _bias_variable(self, shape):
"""バイアスを初期化する.
Parameters
----------
shape : list
バイアスのshape.
e.g. shape=[10].
Return
----------
tf.Variable
バイアス.
"""
# バイアスは0で初期化する
initial = tf.zeros(shape)
return tf.Variable(initial)
def _inference(self, x, rate):
"""推論.
Parameters
----------
x : tf.Tensor
説明変数.
rate : tf.Tensor
ドロップアウト率.
Returns
-------
tf.Tensor
順伝搬の出力.
"""
def batch_normalization(shape, x):
eps = 1e-8
beta = tf.Variable(tf.zeros(shape))
gamma = tf.Variable(tf.ones(shape))
mean, var = tf.nn.moments(x, [0])
return gamma * (x - mean) / tf.sqrt(var + eps) + beta
# 入力層 - 隠れ層、隠れ層 - 隠れ層
for i, n_hidden in enumerate(self.n_hiddens):
if i == 0:
# 入力層の処理
input = x
input_dim = self.n_in
else:
input = output
input_dim = self.n_hiddens[i-1]
# 各層の重みとバイアスを格納する
self.weights.append(self._weight_variable([input_dim, n_hidden]))
self.biases.append(self._bias_variable([n_hidden]))
# 活性化
h = tf.nn.relu(tf.matmul(
input, self.weights[-1]) + self.biases[-1])
# ドロップアウト処理
output = tf.nn.dropout(h, rate=rate)
# 隠れ層 - 出力層
self.weights.append(
self._weight_variable([self.n_hiddens[-1], self.n_out]))
self.biases.append(self._bias_variable([self.n_out]))
# 出力層の活性化
y = tf.nn.softmax(tf.matmul(
output, self.weights[-1]) + self.biases[-1])
return y
def _loss(self, y, t):
"""損失.
Parameters
----------
y : tf.Tensor
順伝搬の出力.
t : tf.Tensor
正解の値.
Returns
-------
tf.Tensor
損失.
"""
# 交差エントロピーを使用
cross_entropy = tf.reduce_mean(-tf.reduce_sum(t *
tf.log(tf.clip_by_value(y, 1e-10, 1.0)), axis=1))
return cross_entropy
def _training(self, loss):
"""学習.
Parameters
----------
loss : tf.Tensor
損失.
Returns
-------
tf.Operation
オペレーションクラス.
"""
# 最適化手法としてSGDを使用
optimizer = tf.train.GradientDescentOptimizer(0.01)
# オペレーションクラスを作成
train_step = optimizer.minimize(loss)
return train_step
def _accuracy(self, y, t):
"""正解率.
Parameters
----------
y : tf.Tensor
順伝搬の出力.
t : tf.Tensor
正解の値.
Returns
-------
tf.Tensor
正解率.
"""
# 確率が最も高い列インデックスを取得し、等しければTrue
correct_prediction = tf.equal(
tf.argmax(y, axis=1), tf.argmax(t, axis=1))
# Trueの割合を計算
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
return accuracy
def fit(self, X_train, Y_train,
nb_epoch=100, batch_size=100, drop_rate=0.5,
verbose=1, validation_data=None, callbacks=None):
"""訓練を実施.
Parameters
----------
X_train : np.ndarray
説明変数.
Y_train : np.ndarray
目的変数.
nb_epoch : int
エポック数.
batch_size : int
バッチのサイズ.
drop_rate : float
ドロップアウト率.
verbose : boolean
trueのときはログを出力.
Returns
-------
dict
損失と正解率.
"""
# 入力変数のプレースホルダ
x = tf.placeholder(tf.float32, shape=[None, self.n_in])
t = tf.placeholder(tf.float32, shape=[None, self.n_out])
rate = tf.placeholder(tf.float32)
self._x = x
self._t = t
self._rate = rate
# 入力データを推論
y = self._inference(x, rate)
# 損失を計算
loss = self._loss(y, t)
# 損失から学習
train_step = self._training(loss)
# 正解率を計算
accuracy = self._accuracy(y, t)
# 作成したデータフローグラフを計算するセッションを構成
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
self._y = y
self._sess = sess
N_train = len(X_train)
n_batches = N_train // batch_size
for epoch in range(nb_epoch):
X_, Y_ = shuffle(X_train, Y_train)
# バッチで学習
for i in range(n_batches):
start = i * batch_size
end = start + batch_size
sess.run(train_step, feed_dict={
x: X_[start:end],
t: Y_[start:end],
rate: drop_rate
})
loss_ = loss.eval(session=sess, feed_dict={
x: X_train,
t: Y_train,
rate: 0.0
})
accuracy_ = accuracy.eval(session=sess, feed_dict={
x: X_train,
t: Y_train,
rate: 0.0
})
# validationデータが有る場合はval_lossを計算
val_loss_ = None
if validation_data is not None:
val_loss_ = loss.eval(session=sess, feed_dict={
x: validation_data[0],
t: validation_data[1],
rate: 0.0
})
# エポックごとの損失と正解率を格納
self._history['loss'].append(loss_)
self._history['accuracy'].append(accuracy_)
self._history['val_loss'].append(val_loss_)
# ログ出力レベル
if verbose:
print('epoch:', epoch,
' loss:', loss_,
' accuracy:', accuracy_,
' val_loss:', val_loss_)
# Early Stopping
if callbacks is not None:
if callbacks.validate(val_loss_):
break
return self._history
def evaluate(self, X_test, Y_test):
"""評価.
Parameters
----------
X_test : np.ndarray
説明変数.
Y_test : np.ndarray
目的変数.
Returns
-------
np.ndarray
正解率.
"""
accuracy = self._accuracy(self._y, self._t)
return accuracy.eval(session=self._sess, feed_dict={
self._x: X_test,
self._t: Y_test,
self._rate: 0.0
})
class EarlyStopping():
"""連続で損失が減少しなかったとき訓練を終了する."""
def __init__(self, patience=0, verbose=0):
"""コンストラクタ.
Parameters
----------
patience : int
許容回数.
verbose : np.ndarray
ログ出力レベル.
"""
self._step = 0
self._loss = float('inf')
self.patience = patience
self.verbose = verbose
def validate(self, loss):
"""損失を計算.
Parameters
----------
loss : np.ndarray
損失.
Returns
----------
bool
許容回数を超えたらtrue.
"""
if self._loss < loss:
self._step += 1
if self._step > self.patience:
if self.verbose:
print('early stopping')
return True
else:
self._step = 0
self._loss = loss
return False
if __name__ == '__main__':
"""データの生成"""
# mnist = datasets.fetch_mldata('MNIST original', data_home='.')
from sklearn.datasets import fetch_openml
mnist = fetch_openml('mnist_784', version=1,)
n = len(mnist.data)
N = 10000 # MNISTの一部を使う
indices = np.random.permutation(range(n))[:N] # ランダムにN枚を選択
X = mnist.data[indices]
y = mnist.target[indices]
Y = np.eye(10)[y.astype(int)] # 1-of-K 表現に変換
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, train_size=0.8)
X_train, X_valid, Y_train, Y_valid = train_test_split(
X_train, Y_train, train_size=0.8)
"""モデル設定"""
earlyStopping = EarlyStopping(patience=5)
model = DNN(n_in=len(X[0]),
n_hiddens=[200, 200, 200],
n_out=len(Y[0]))
"""モデル学習"""
model.fit(X_train, Y_train,
nb_epoch=100,
batch_size=200,
drop_rate=0.5,
verbose=1,
callbacks=earlyStopping,
validation_data=(X_valid, Y_valid))
"""予測精度の評価"""
accuracy = model.evaluate(X_test, Y_test)
print('accuracy: ', accuracy)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment