Forked from danijar/blog_tensorflow_sequence_labelling.py
Created
February 9, 2017 01:15
-
-
Save yuwen-yan/672da7edf90faa140ad8bf7adc57747f to your computer and use it in GitHub Desktop.
TensorFlow Sequence Labelling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example for my blog post at: | |
# http://danijar.com/introduction-to-recurrent-networks-in-tensorflow/ | |
import functools | |
import sets | |
import tensorflow as tf | |
def lazy_property(function): | |
attribute = '_' + function.__name__ | |
@property | |
@functools.wraps(function) | |
def wrapper(self): | |
if not hasattr(self, attribute): | |
setattr(self, attribute, function(self)) | |
return getattr(self, attribute) | |
return wrapper | |
class SequenceLabelling: | |
def __init__(self, data, target, dropout, num_hidden=200, num_layers=3): | |
self.data = data | |
self.target = target | |
self.dropout = dropout | |
self._num_hidden = num_hidden | |
self._num_layers = num_layers | |
self.prediction | |
self.error | |
self.optimize | |
@lazy_property | |
def prediction(self): | |
# Recurrent network. | |
network = tf.nn.rnn_cell.GRUCell(self._num_hidden) | |
network = tf.nn.rnn_cell.DropoutWrapper( | |
network, output_keep_prob=self.dropout) | |
network = tf.nn.rnn_cell.MultiRNNCell([network] * self._num_layers) | |
output, _ = tf.nn.dynamic_rnn(network, data, dtype=tf.float32) | |
# Softmax layer. | |
max_length = int(self.target.get_shape()[1]) | |
num_classes = int(self.target.get_shape()[2]) | |
weight, bias = self._weight_and_bias(self._num_hidden, num_classes) | |
# Flatten to apply same weights to all time steps. | |
output = tf.reshape(output, [-1, self._num_hidden]) | |
prediction = tf.nn.softmax(tf.matmul(output, weight) + bias) | |
prediction = tf.reshape(prediction, [-1, max_length, num_classes]) | |
return prediction | |
@lazy_property | |
def cost(self): | |
cross_entropy = -tf.reduce_sum( | |
self.target * tf.log(self.prediction), [1, 2]) | |
cross_entropy = tf.reduce_mean(cross_entropy) | |
return cross_entropy | |
@lazy_property | |
def optimize(self): | |
learning_rate = 0.003 | |
optimizer = tf.train.RMSPropOptimizer(learning_rate) | |
return optimizer.minimize(self.cost) | |
@lazy_property | |
def error(self): | |
mistakes = tf.not_equal( | |
tf.argmax(self.target, 2), tf.argmax(self.prediction, 2)) | |
return tf.reduce_mean(tf.cast(mistakes, tf.float32)) | |
@staticmethod | |
def _weight_and_bias(in_size, out_size): | |
weight = tf.truncated_normal([in_size, out_size], stddev=0.01) | |
bias = tf.constant(0.1, shape=[out_size]) | |
return tf.Variable(weight), tf.Variable(bias) | |
def read_dataset(): | |
dataset = sets.Ocr() | |
dataset = sets.OneHot(dataset.target, depth=2)(dataset, columns=['target']) | |
dataset['data'] = dataset.data.reshape( | |
dataset.data.shape[:-2] + (-1,)).astype(float) | |
train, test = sets.Split(0.66)(dataset) | |
return train, test | |
if __name__ == '__main__': | |
train, test = read_dataset() | |
_, length, image_size = train.data.shape | |
num_classes = train.target.shape[2] | |
data = tf.placeholder(tf.float32, [None, length, image_size]) | |
target = tf.placeholder(tf.float32, [None, length, num_classes]) | |
dropout = tf.placeholder(tf.float32) | |
model = SequenceLabelling(data, target, dropout) | |
sess = tf.Session() | |
sess.run(tf.initialize_all_variables()) | |
for epoch in range(10): | |
for _ in range(100): | |
batch = train.sample(10) | |
sess.run(model.optimize, { | |
data: batch.data, target: batch.target, dropout: 0.5}) | |
error = sess.run(model.error, { | |
data: test.data, target: test.target, dropout: 1}) | |
print('Epoch {:2d} error {:3.1f}%'.format(epoch + 1, 100 * error)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment