Skip to content

Instantly share code, notes, and snippets.

@vgoklani
Forked from amitoengg/TestLSTMType.py
Created February 27, 2018 15:44
Show Gist options
  • Save vgoklani/c37f156f5926bfe7b671813bcef96790 to your computer and use it in GitHub Desktop.
Save vgoklani/c37f156f5926bfe7b671813bcef96790 to your computer and use it in GitHub Desktop.
from __future__ import print_function
from itertools import starmap
import tensorflow as tf
import random
from tensorflow.python.ops import rnn
import math
flags = tf.flags
logging = tf.logging
# flags.DEFINE_string("save_path", None,
# "Model output directory.")
# flags.DEFINE_string("rnn_mode", None,
# "The low level implementation of lstm cell: one of CUDNN, "
# "BASIC, and BLOCK, representing cudnn_lstm, basic_lstm, "
# "and lstm_block_cell classes.")
# FLAGS = flags.FLAGS
BASIC = "basic"
CUDNN = "cudnn"
BLOCK = "block"
import numpy as np
import time
# ====================
# TOY DATA GENERATOR
# ====================
class ToySequenceData(object):
""" Generate sequence of data with dynamic length.
This class generate samples for training:
- Class 0: linear sequences (i.e. [0, 1, 2, 3,...])
- Class 1: random sequences (i.e. [1, 3, 10, 7,...])
NOTICE:
We have to pad each sequence to reach 'max_seq_len' for TensorFlow
consistency (we cannot feed a numpy array with inconsistent
dimensions). The dynamic calculation will then be perform thanks to
'seqlen' attribute that records every actual sequence length.
"""
def __init__(self, n_samples=1000, max_seq_len=20, min_seq_len=3,
max_value=1000, batch_size=128, varyLength=True):
self.data = []
self.labels = []
self.seqlen = []
for i in range(n_samples):
# Random sequence length
if varyLength:
length = random.randint(min_seq_len, max_seq_len)
else:
length = (min_seq_len + max_seq_len) // 2
# Monitor sequence length for TensorFlow dynamic calculation
self.seqlen.append(length)
# Add a random or linear int sequence (50% prob)
if random.random() < .5:
# Generate a linear sequence
rand_start = random.randint(0, max_value - length)
s = [[float(i) / max_value] for i in
range(rand_start, rand_start + length)]
# # Pad sequence for dimension consistency
# s += [[0.] for i in range(max_seq_len - length)]
self.data.append(s)
self.labels.append([1., 0.])
else:
# Generate a random sequence
s = [[float(random.randint(0, max_value)) / max_value]
for i in range(length)]
# Pad sequence for dimension consistency
# s += [[0.] for i in range(max_seq_len - length)]
self.data.append(s)
self.labels.append([0., 1.])
self.batch_id = 0
self.data, self.labels, self.seqlen = (list(t) for t in zip(
*sorted(zip(self.data, self.labels, self.seqlen), key=lambda x: x[2])))
print('sequence length', self.seqlen)
# static batches of data sorted by seq_length and all sequences in the same batch are padded to be of same length
self.batches = []
total_elements = len(self.data)
if (batch_size == -1):
batch_size = total_elements
num_batches = 1
else:
num_batches = int(math.ceil(total_elements / batch_size))
for _ in range(num_batches):
b = self.nextBatch(batch_size)
# print ('batch length ',b[2][0])
self.batches.append(b)
self.cur_pointer = 0
self.num_batch = len(self.batches)
self.index_array = np.arange(self.num_batch)
print('ToySequenceData num_batch', self.num_batch)
# called during training to get next batch of data. shuffle if iterated through data once
def next(self):
if self.cur_pointer >= self.num_batch:
self.cur_pointer = 0
np.random.shuffle(self.index_array)
# print('{} '.format(self.index_array[self.cur_pointer]))
cur_batch = self.batches[self.index_array[self.cur_pointer]]
self.cur_pointer += 1
return cur_batch
# create a batch used in init
def nextBatch(self, batch_size):
""" Return a batch of data. When dataset end is reached, start over.
"""
if self.batch_id == len(self.data):
self.batch_id = 0
batch_data = (self.data[self.batch_id:min(self.batch_id +
batch_size, len(self.data))])
batch_labels = (self.labels[self.batch_id:min(self.batch_id +
batch_size, len(self.data))])
batch_seqlen = (self.seqlen[self.batch_id:min(self.batch_id +
batch_size, len(self.data))])
max_seq_len = max(batch_seqlen)
print('batch id', self.batch_id, ' max_seq_len', max_seq_len)
for i, item in enumerate(batch_data):
item.extend([[0.] for i in range(max_seq_len - batch_seqlen[i])])
# for i, item in enumerate(batch_data):
# print(i, '\t', (max_seq_len - batch_seqlen[i]), "\t", batch_seqlen[i], "\t", batch_labels[i], "\t", item)
# batch_data = pad_sequences(batch_data, maxlen=max_seq_len)
self.batch_id = min(self.batch_id + batch_size, len(self.data))
# print(len(batch_data), len(batch_labels), len(batch_seqlen))
return batch_data, batch_labels, batch_seqlen
#
# def _get_lstm_cell(self, config, is_training):
# if config.rnn_mode == BASIC:
# return tf.contrib.rnn.BasicLSTMCell(
# config.hidden_size, forget_bias=0.0, state_is_tuple=True,
# reuse=not is_training) # whats the importance of reuse ???
# if config.rnn_mode == BLOCK:
# return tf.contrib.rnn.LSTMBlockCell(
# config.hidden_size, forget_bias=0.0)
# raise ValueError("rnn_mode %s not supported" % config.rnn_mode)
#
# def _build_rnn_graph(self, inputs, config, is_training):
# if config.rnn_mode == CUDNN:
# return self._build_rnn_graph_cudnn(inputs, config, is_training)
# else:
# return self._build_rnn_graph_lstm(inputs, config, is_training)
def collect_final_step_of_lstm(lstm_representation, lengths):
# lstm_representation: [batch_size, seq_length, dim]
# lengths: [batch_size]
batch_size = tf.shape(lengths)[0]
batch_nums = tf.range(0, limit=batch_size) # shape (batch_size)
indices = tf.stack((batch_nums, lengths), axis=1) # shape (batch_size, 2)
result = tf.gather_nd(lstm_representation, indices, name='last-forwar-lstm')
return result # [batch_size, dim]
def build_rnn_graph_cudnn(inputs, sequence_length, options, is_training):
size =1
if options.direction=='bidirectional' :
size = 2
inputs = tf.transpose(inputs, [1, 0, 2])
cell = tf.contrib.cudnn_rnn.CudnnLSTM(direction=options.direction,
num_layers=options.num_layers,
num_units=options.n_hidden,
input_size=options.n_hidden,
dropout=options.dropout_rate if is_training else 0)
params_size_t = cell.params_size()
rnn_params = tf.get_variable(
"lstm_params",
initializer=tf.random_uniform(
[params_size_t], -options.init_scale, options.init_scale),
validate_shape=False)
c = tf.zeros([options.num_layers, options.batch_size, options.n_hidden],
tf.float32)
h = tf.zeros([options.num_layers, options.batch_size, options.n_hidden],
tf.float32)
# initial_state = (tf.contrib.rnn.LSTMStateTuple(h=h, c=c),)
outputs, h, c = cell(input_data=inputs, input_h=h, input_c=c, params=rnn_params, is_training=is_training)
in_text_repres = tf.transpose(outputs, [1, 0, 2])
# outputs = tf.transpose(outputs, [1, 0, 2])
# in_text_repres = tf.reshape(in_text_repres, [-1, options.n_hidden])
in_text_repres = collect_final_step_of_lstm(in_text_repres, sequence_length - 1)
# in_text_repres = in_text_repres[:, 0, :]
# in_text_repres = tf.reshape(outputs, [-1, options.n_hidden])
# in_text_repres = tf.reshape(tf.concat(values=tf.transpose(outputs, [1, 0, 2]), axis=1), [-1, options.n_hidden])
# droput layer
if is_training:
in_text_repres = tf.nn.dropout(in_text_repres, (1 - options.dropout_rate))
else:
in_text_repres = tf.multiply(in_text_repres, (1 - options.dropout_rate))
# ========Prediction Layer=========
w_0 = tf.get_variable("w_0", [options.n_hidden*size, options.n_classes], dtype=tf.float32)
b_0 = tf.get_variable("b_0", [options.n_classes], dtype=tf.float32)
logits = tf.nn.xw_plus_b(in_text_repres, w_0, b_0, name="logits")
prob = tf.nn.softmax(logits)
return logits, prob
def build_rnn_graph_lstm_new(inputs, sequence_length, options, is_training):
# [batch_size, text_len, word_dim]
lstm_cell_fw = tf.nn.rnn_cell.BasicLSTMCell(options.n_hidden / 2)
lstm_cell_bw = tf.nn.rnn_cell.BasicLSTMCell(options.n_hidden / 2)
if is_training:
lstm_cell_fw = tf.nn.rnn_cell.DropoutWrapper(lstm_cell_fw, output_keep_prob=(1 - options.dropout_rate))
lstm_cell_bw = tf.nn.rnn_cell.DropoutWrapper(lstm_cell_bw, output_keep_prob=(1 - options.dropout_rate))
lstm_cell_fw = tf.nn.rnn_cell.MultiRNNCell([lstm_cell_fw])
lstm_cell_bw = tf.nn.rnn_cell.MultiRNNCell([lstm_cell_bw])
cur_representation, _ = rnn.bidirectional_dynamic_rnn(
lstm_cell_fw, lstm_cell_bw, inputs,
dtype=tf.float32, sequence_length=sequence_length)
forward_rep = collect_final_step_of_lstm(cur_representation[0], sequence_length - 1)
backward_rep = cur_representation[1][:, 0, :]
in_text_repres = tf.concat([forward_rep, backward_rep], axis=1)
# droput layer
if is_training:
in_text_repres = tf.nn.dropout(in_text_repres, (1 - options.dropout_rate))
else:
in_text_repres = tf.multiply(in_text_repres, (1 - options.dropout_rate))
# ========Prediction Layer=========
w_0 = tf.get_variable("w_0", [options.n_hidden, options.n_classes], dtype=tf.float32)
b_0 = tf.get_variable("b_0", [options.n_classes], dtype=tf.float32)
logits = tf.nn.xw_plus_b(in_text_repres, w_0, b_0, name="logits")
prob = tf.nn.softmax(logits)
return logits, prob
def main(_):
config = get_config()
trainset = ToySequenceData(n_samples=1000, max_seq_len=config.seq_max_len, batch_size=config.batch_size,
varyLength=config.varyLength)
testset = ToySequenceData(n_samples=500, max_seq_len=config.seq_max_len, batch_size=config.batch_size,
varyLength=config.varyLength)
# tf Graph input
x = tf.placeholder("float", [None, None, 1])
y = tf.placeholder("float", [None, config.n_classes])
# A placeholder for indicating each sequence length
seqlen = tf.placeholder(tf.int32, [None])
if config.rnn_mode == BASIC:
logits, prob = build_rnn_graph_lstm_new(x, seqlen, options=config, is_training=True)
else:
logits, prob = build_rnn_graph_cudnn(x, seqlen, options=config, is_training=True)
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=y))
correct_pred = tf.equal(tf.argmax(prob, 1), tf.argmax(y, 1))
correct_pred_sum = tf.reduce_sum(tf.cast(correct_pred, tf.float32))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
clipper = 50
if (config.optimizer == "adam"):
optimizer = tf.train.AdamOptimizer(learning_rate=config.learning_rate)
else:
optimizer = tf.train.GradientDescentOptimizer(learning_rate=config.learning_rate)
tvars = tf.trainable_variables()
if config.lambda_l2 > 0.0:
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in tvars if v.get_shape().ndims > 1])
loss = loss + config.lambda_l2 * l2_loss
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, tvars), clipper)
train_op = optimizer.apply_gradients(zip(grads, tvars))
extra_train_ops = []
train_ops = [train_op] + extra_train_ops
train_op = tf.group(*train_ops)
# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()
num_examples = len(trainset.data)
# num_batches = (num_examples // config.batch_size) + 1
num_batches = int(math.ceil(num_examples / config.batch_size))
test_num_examples = len(testset.data)
test_num_batches = int(math.ceil(test_num_examples / config.batch_size))
print('FLAGS.batch_size ', config.batch_size)
print('num_batches ', num_batches)
# Start training
with tf.Session() as sess:
# Run the initializer
sess.run(init)
gstep = 0
for epoch in range(config.num_epochs):
total_loss = 0
total_correct = 0
test_total_correct = 0
estart_time = time.time()
for step in range(num_batches):
gstep = gstep + 1
batch_x, batch_y, batch_seqlen = trainset.next()
if config.debug:
print('gstep', gstep, len(batch_seqlen), batch_seqlen)
# Run optimization op (backprop)
_, loss_val, correct_pred_sum_val = sess.run([train_op, loss, correct_pred_sum],
feed_dict={x: batch_x, y: batch_y,
seqlen: batch_seqlen})
total_loss = total_loss + loss_val
total_correct = total_correct + correct_pred_sum_val
# Calculate accuracy
# test_data = testset.data
# test_label = testset.labels
# test_seqlen = testset.seqlen
tduration = time.time() - estart_time
if config.eval :
for step in range(test_num_batches):
test_data, test_label, test_seqlen = testset.next()
if config.debug:
print('test gstep', gstep, len(test_data), test_seqlen)
test_correct_pred_sum_val = sess.run(correct_pred_sum,
feed_dict={x: test_data, y: test_label, seqlen: test_seqlen})
#print (test_correct_pred_sum_val)
test_total_correct = test_total_correct + test_correct_pred_sum_val
#test_acc = 0;
# test_acc = sess.run(accuracy, feed_dict={x: test_data, y: test_label,
# seqlen: test_seqlen})
print("epoch=%5d\ttime=\t%.2f\tloss=%.6f\ttrain_acc=%.4f\ttest_acc=%.4f" % (
epoch,tduration, total_loss / num_examples, total_correct / num_examples, test_total_correct/test_num_examples))
class TestConfig(object):
debug = False
#rnn_mode = BASIC
rnn_mode = CUDNN
varyLength = True
optimizer = "sgd" # "adam" # "sgd"
learning_rate = 0.001
if optimizer == "sgd":
learning_rate = 0.5
num_epochs = 1000
batch_size = 100
display_step = 50
# Network Parameters
seq_max_len = 20 # Sequence max length
n_hidden = 64 # hidden layer num of features
n_classes = 2 # linear sequence or not
dropout_rate = 0.5
lambda_l2 = 0.0
num_layers = 1
init_scale = 0.1
eval=True
direction='bidirectional'#'bidirectional''bidirectional'
def get_config():
config = TestConfig()
return config
if __name__ == "__main__":
tf.app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment