Last active
May 15, 2022 14:28
-
-
Save danijar/d11c77c5565482e965d1919291044470 to your computer and use it in GitHub Desktop.
TensorFlow Variable-Length Sequence Labelling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Working example for my blog post at: | |
# http://danijar.com/variable-sequence-lengths-in-tensorflow/ | |
import functools | |
import sets | |
import tensorflow as tf | |
from tensorflow.models.rnn import rnn_cell | |
from tensorflow.models.rnn import rnn | |
def lazy_property(function): | |
attribute = '_' + function.__name__ | |
@property | |
@functools.wraps(function) | |
def wrapper(self): | |
if not hasattr(self, attribute): | |
setattr(self, attribute, function(self)) | |
return getattr(self, attribute) | |
return wrapper | |
class VariableSequenceLabelling: | |
def __init__(self, data, target, num_hidden=200, num_layers=3): | |
self.data = data | |
self.target = target | |
self._num_hidden = num_hidden | |
self._num_layers = num_layers | |
self.prediction | |
self.error | |
self.optimize | |
@lazy_property | |
def length(self): | |
used = tf.sign(tf.reduce_max(tf.abs(self.data), reduction_indices=2)) | |
length = tf.reduce_sum(used, reduction_indices=1) | |
length = tf.cast(length, tf.int32) | |
return length | |
@lazy_property | |
def prediction(self): | |
# Recurrent network. | |
output, _ = rnn.dynamic_rnn( | |
rnn_cell.GRUCell(self._num_hidden), | |
self.data, | |
dtype=tf.float32, | |
sequence_length=self.length, | |
) | |
# Softmax layer. | |
max_length = int(self.target.get_shape()[1]) | |
num_classes = int(self.target.get_shape()[2]) | |
weight, bias = self._weight_and_bias(self._num_hidden, num_classes) | |
# Flatten to apply same weights to all time steps. | |
output = tf.reshape(output, [-1, self._num_hidden]) | |
prediction = tf.nn.softmax(tf.matmul(output, weight) + bias) | |
prediction = tf.reshape(prediction, [-1, max_length, num_classes]) | |
return prediction | |
@lazy_property | |
def cost(self): | |
# Compute cross entropy for each frame. | |
cross_entropy = self.target * tf.log(self.prediction) | |
cross_entropy = -tf.reduce_sum(cross_entropy, reduction_indices=2) | |
mask = tf.sign(tf.reduce_max(tf.abs(self.target), reduction_indices=2)) | |
cross_entropy *= mask | |
# Average over actual sequence lengths. | |
cross_entropy = tf.reduce_sum(cross_entropy, reduction_indices=1) | |
cross_entropy /= tf.cast(self.length, tf.float32) | |
return tf.reduce_mean(cross_entropy) | |
@lazy_property | |
def optimize(self): | |
learning_rate = 0.0003 | |
optimizer = tf.train.AdamOptimizer(learning_rate) | |
return optimizer.minimize(self.cost) | |
@lazy_property | |
def error(self): | |
mistakes = tf.not_equal( | |
tf.argmax(self.target, 2), tf.argmax(self.prediction, 2)) | |
mistakes = tf.cast(mistakes, tf.float32) | |
mask = tf.sign(tf.reduce_max(tf.abs(self.target), reduction_indices=2)) | |
mistakes *= mask | |
# Average over actual sequence lengths. | |
mistakes = tf.reduce_sum(mistakes, reduction_indices=1) | |
mistakes /= tf.cast(self.length, tf.float32) | |
return tf.reduce_mean(mistakes) | |
@staticmethod | |
def _weight_and_bias(in_size, out_size): | |
weight = tf.truncated_normal([in_size, out_size], stddev=0.01) | |
bias = tf.constant(0.1, shape=[out_size]) | |
return tf.Variable(weight), tf.Variable(bias) | |
def get_dataset(): | |
"""Read dataset and flatten images.""" | |
dataset = sets.Ocr() | |
dataset = sets.OneHot(dataset.target, depth=2)(dataset, columns=['target']) | |
dataset['data'] = dataset.data.reshape( | |
dataset.data.shape[:-2] + (-1,)).astype(float) | |
train, test = sets.Split(0.66)(dataset) | |
return train, test | |
if __name__ == '__main__': | |
train, test = get_dataset() | |
_, length, image_size = train.data.shape | |
num_classes = train.target.shape[2] | |
data = tf.placeholder(tf.float32, [None, length, image_size]) | |
target = tf.placeholder(tf.float32, [None, length, num_classes]) | |
model = VariableSequenceLabelling(data, target) | |
sess = tf.Session() | |
sess.run(tf.initialize_all_variables()) | |
for epoch in range(10): | |
for _ in range(100): | |
batch = train.sample(10) | |
sess.run(model.optimize, {data: batch.data, target: batch.target}) | |
error = sess.run(model.error, {data: test.data, target: test.target}) | |
print('Epoch {:2d} error {:3.1f}%'.format(epoch + 1, 100 * error)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Does
max_length
in line 50 have the same value aslength
in line 108?