Skip to content

Instantly share code, notes, and snippets.

@infinex
Created February 18, 2018 16:55
Show Gist options
  • Select an option

  • Save infinex/c729e59c3496858648a0c9d72ae06b6a to your computer and use it in GitHub Desktop.

Select an option

Save infinex/c729e59c3496858648a0c9d72ae06b6a to your computer and use it in GitHub Desktop.
Siamese Twin Eager Mode Tensorflow Bidirectional LSTM
#BiDirectional LSTM
#Training Data
# Embedding1,Embedding2,Labels
# 1 Jurong,Jurng,1
# 2 Jurong,Jrung,1
# 3 Jurong,Bishan,0
# Purpose: To create measure to determine how similar sequences of inputs are to each other.
import os
import random
import string
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.contrib.eager as tfe
import contextlib
from tensorflow.python.eager import tape
from tensorflow.python.eager import imperative_grad
from tensorflow.python.eager.backprop import _default_vspace
from tensorflow.python.util import nest
tfe.enable_eager_execution()
# Model parameters
batch_size = 200
n_batches = 200
max_address_len = 30
margin = 0.25
num_features = 50
dropout_keep_prob = 0.8
street_names = ['Jurong East','Bukit Batok','Bukit Gombak','Choa Chu Kang','Yew Tee','Kranji','Marsiling','Woodlands','Admiralty','Sembawang','Yishun','Khatib','Yio Chu Kang','Ang Mo Kio','Bishan','Braddell','Toa Payoh','Novena','Newton','Orchard','Somerset','Dhoby Ghaut','City Hall','Raffles Place','Marina Bay','Marina South Pier','Pasir Ris','Tampines','Simei','Tanah Merah','Bedok','Kembangan','Eunos','Paya Lebar','Aljunied','Kallang','Lavender','Bugis','City Hall','Raffles Place','Tanjong Pagar','Outram Park','Tiong Bahru','Redhill','Queenstown','Commonwealth','Buona Vista','Dover','Clementi','Jurong East','Chinese Garden','Lakeside','Boon Lay','Pioneer','Joo Koon','Gul Circle','Tuas Crescent','Tuas West Road','Tuas Link','Expo','Changi Airport','HarbourFront','Outram Park','Chinatown','Clarke Quay','Dhoby Ghaut','Little India','Farrer Park','Boon Keng','Potong Pasir','Woodleigh','Serangoon','Kovan','Hougang','Buangkok','Sengkang','Punggol','Dhoby Ghaut','Bras Basah','Esplanade','Promenade','Nicoll Highway','Stadium','Mountbatten','Dakota','Paya Lebar','MacPherson','Tai Seng','Bartley','Serangoon','Lorong Chuan','Bishan','Marymount','Caldecott','Botanic Gardens','Farrer Road','Holland Village','Buona Vista','one-north','Kent Ridge','Haw Par Villa','Pasir Panjang','Labrador Park','Telok Blangah','HarbourFront','Bayfront','Marina Bay','Bukit Panjang','Cashew','Hillview','Beauty World','King Albert Park','Sixth Avenue','Tan Kah Kee','Botanic Gardens','Stevens','Newton','Little India','Rochor','Bugis','Promenade','Bayfront','Downtown','Telok Ayer','Chinatown','Fort Canning','Bencoolen','Jalan Besar','Bendemeer','Geylang Bahru','Mattar','MacPherson','Ubi','Kaki Bukit','Bedok North','Bedok Reservoir','Tampines West','Tampines','Tampines East','Upper Changi','Expo','Choa Chu Kang','South View','Keat Hong','Teck Whye','Phoenix','Bukit Panjang','Petir','Pending','Bangkit','Fajar','Segar','Jelapang','Senja','Ten Mile Junction','Sengkang','Compassvale','Rumbia','Bakau','Kangkar','Ranggung','Cheng Lim','Farmway','Kupang','Thanggam','Fernvale','Layar','Tongkang','Renjong','Punggol','Cove','Meridian','Coral Edge','Riviera','Kadaloor','Oasis','Damai','Sam Kee','Teck Lee','Punggol Point','Samudera','Nibong','Sumang','Soo Teck']
street_types = ['rd', 'st', 'ln', 'ave', 'cir','dr', 'jct']
test_queries = ['111 Jurong East ln', '271 Bukit Batok',
'314 Bishan avenue', 'tensorflow is fun']
test_references = ['111 Jurongg East ln', '271 Bukt Batk',
'314 Bishn avenue', 'tensorflow is so fun']
def create_typo(s):
rand_ind = random.choice(range(len(s)))
s_list = list(s)
s_list[rand_ind] = random.choice(string.ascii_lowercase + '0123456789')
s = ''.join(s_list)
return (s)
# Get a batch of size n, half of which is similar addresses, half are not
def get_batch(n):
# Generate a list of reference addresses with similar addresses that have
# a typo.
numbers = [random.randint(1, 9999) for i in range(n)]
streets = [random.choice(street_names) for i in range(n)]
street_suffs = [random.choice(street_types) for i in range(n)]
full_streets = [str(w) + ' ' + x + ' ' + y for w, x, y in zip(numbers, streets, street_suffs)]
typo_streets = [create_typo(x) for x in full_streets]
reference = [list(x) for x in zip(full_streets, typo_streets)]
# Shuffle last half of them for training on dissimilar addresses
half_ix = int(n / 2)
bottom_half = reference[half_ix:]
true_address = [x[0] for x in bottom_half]
typo_address = [x[1] for x in bottom_half]
typo_address = list(np.roll(typo_address, 1))
bottom_half = [[x, y] for x, y in zip(true_address, typo_address)]
reference[half_ix:] = bottom_half
# Get target similarities (1's for similar, -1's for non-similar)
target = [1] * (n - half_ix) + [-1] * half_ix
reference = [[x, y] for x, y in zip(reference, target)]
return (reference)
# Define vocabulary dictionary (remember to save '0' for padding)
vocab_chars = string.ascii_lowercase + '0123456789 ' + string.punctuation
vocab2ix_dict = {char: (ix + 1) for ix, char in enumerate(vocab_chars)}
vocab_length = len(vocab_chars) + 1
# Define vocab one-hot encoding
def address2onehot(address,
vocab2ix_dict=vocab2ix_dict,
max_address_len=max_address_len):
# translate address string into indices
address=address.lower()
address_ix = [vocab2ix_dict[x] for x in list(address)]
# Pad or crop to max_address_len
address_ix = (address_ix + [0] * max_address_len)[0:max_address_len]
return (address_ix)
tf.layers
class siamese_nn(tfe.Network):
def __init__(self,num_hidden,vocab_size, num_features, input_length):
super(siamese_nn, self).__init__(name="")
self.vocab_size=vocab_size
self.num_features=num_features
self.input_length=input_length
cell_unit=tf.contrib.rnn.BasicLSTMCell
self.forward_cell=self.track_layer(cell_unit(num_hidden, forget_bias=1.0))
self.backward_cell=self.track_layer(cell_unit(num_hidden, forget_bias=1.0))
self.dense=self.track_layer(tf.layers.Dense(10))
def call(self,input_vector,dropout_keep_prob):
lstm_forward_cell = tf.contrib.rnn.DropoutWrapper(self.forward_cell, output_keep_prob=dropout_keep_prob)
lstm_backward_cell = tf.contrib.rnn.DropoutWrapper(self.backward_cell, output_keep_prob=dropout_keep_prob)
input_embed_split = tf.split(axis=1, num_or_size_splits=self.input_length, value=input_vector)
input_embed_split = [tf.squeeze(x, axis=[1]) for x in input_embed_split]
# Create bidirectional layer
try:
outputs, _, _ = tf.contrib.rnn.static_bidirectional_rnn(lstm_forward_cell,
lstm_backward_cell,
input_embed_split,
dtype=tf.float32)
except Exception:
outputs = tf.contrib.rnn.static_bidirectional_rnn(lstm_forward_cell,
lstm_backward_cell,
input_embed_split,
dtype=tf.float32)
# Average The output over the sequence
temporal_mean = tf.add_n(outputs) / self.input_length
# Fully connected layer
output_size = 10
final_output = tf.nn.dropout(self.dense(temporal_mean),dropout_keep_prob)
return final_output
def snn(model,address1, address2, dropout_keep_prob):
identity_mat = tf.diag(tf.ones(shape=[vocab_length]))
address1 = tf.nn.embedding_lookup(identity_mat, address1)
address2 = tf.nn.embedding_lookup(identity_mat, address2)
output1 = model(address1, dropout_keep_prob)
# Declare that we will use the same variables on the second string
with tf.variable_scope(tf.get_variable_scope(), reuse=True):
output2 = model(address2, dropout_keep_prob)
# Unit normalize the outputs
output1 = tf.nn.l2_normalize(output1, 1)
output2 = tf.nn.l2_normalize(output2, 1)
# Return cosine distance
# in this case, the dot product of the norms is the same.
dot_prod = tf.reduce_sum(tf.multiply(output1, output2), 1)
return (dot_prod)
def get_predictions(scores):
predictions = tf.sign(scores, name="predictions")
return (predictions)
def loss_fn(scores, y_target, margin):
# Calculate the positive losses
pos_loss_term = 0.25 * tf.square(tf.subtract(1., scores))
# If y-target is -1 to 1, then do the following
pos_mult = tf.add(tf.multiply(0.5, tf.cast(y_target, tf.float32)), 0.5)
# Else if y-target is 0 to 1, then do the following
pos_mult = tf.cast(y_target, tf.float32)
# Make sure positive losses are on similar strings
positive_loss = tf.multiply(pos_mult, pos_loss_term)
# Calculate negative losses, then make sure on dissimilar strings
# If y-target is -1 to 1, then do the following:
neg_mult = tf.add(tf.multiply(-0.5, tf.cast(y_target, tf.float32)), 0.5)
# Else if y-target is 0 to 1, then do the following
neg_mult = tf.subtract(1., tf.cast(y_target, tf.float32))
negative_loss = neg_mult * tf.square(scores)
# Combine similar and dissimilar losses
loss = tf.add(positive_loss, negative_loss)
# Create the margin term. This is when the targets are 0.,
# and the scores are less than m, return 0.
# Check if target is zero (dissimilar strings)
target_zero = tf.equal(tf.cast(y_target, tf.float32), 0.)
# Check if cosine outputs is smaller than margin
less_than_margin = tf.less(scores, margin)
# Check if both are true
both_logical = tf.logical_and(target_zero, less_than_margin)
both_logical = tf.cast(both_logical, tf.float32)
# If both are true, then multiply by (1-1)=0.
multiplicative_factor = tf.cast(1. - both_logical, tf.float32)
total_loss = tf.multiply(loss, multiplicative_factor)
# Average loss over batch
avg_loss = tf.reduce_mean(total_loss)
return (avg_loss)
def accuracy(scores, y_target):
predictions = get_predictions(scores)
# Cast into integers (outputs can only be -1 or +1)
y_target_int = tf.cast(y_target, tf.int32)
# Change targets from (0,1) --> (-1, 1)
# via (2 * x - 1)
# y_target_int = tf.sub(tf.mul(y_target_int, 2), 1)
predictions_int = tf.cast(tf.sign(predictions), tf.int32)
correct_predictions = tf.equal(predictions_int, y_target_int)
accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32))
return (accuracy)
#https://gist.github.com/traveller59/8849c5aa7b2db6ec705e9dc9b1fb3591
#Useful tricks in tensorflow eager (tested in tensorflow 1.5.0dev20171229)
class RecordInfo(object):
def __init__(self):
self.end_node = None
self.popped_tape = None
def backward(self, end_node):
if end_node is None:
raise ValueError(
"Cannot differentiate a function that returns None")
if self.popped_tape is None:
raise RuntimeError("Must call this after with record() scope end")
variables = list(
sorted(
self.popped_tape.watched_variables(),
key=lambda v: v.handle._id)) # pylint: disable=protected-access
sources = [x.handle for x in variables]
if not sources:
raise ValueError("No trainable variables were accessed while the "
"function was being computed.")
grad = imperative_grad.imperative_grad(_default_vspace,
self.popped_tape,
nest.flatten(end_node), sources)
self.popped_tape = None
return list(zip(grad, variables))
@contextlib.contextmanager
def record():
record_info = RecordInfo()
tape.push_new_tape()
try:
yield record_info
finally:
popped_tape = tape.pop_tape()
record_info.popped_tape = popped_tape
model=siamese_nn(num_features,vocab_length, num_features, max_address_len)
optimizer = tf.train.AdamOptimizer(0.01)
grads = tfe.implicit_value_and_gradients(loss_fn)
loss_vec=[]
accuracy_vec=[]
for b in range(n_batches):
batch_data = get_batch(batch_size)
# Shuffle data
np.random.shuffle(batch_data)
# Parse addresses and targets
input_addresses = [x[0] for x in batch_data]
target_similarity = np.array([x[1] for x in batch_data])
address1 = np.array([address2onehot(x[0]) for x in input_addresses])
address2 = np.array([address2onehot(x[1]) for x in input_addresses])
with record() as record_:
text_snn = snn(model,address1, address2,dropout_keep_prob)
loss = loss_fn(text_snn,target_similarity,margin)
grads = record_.backward(loss)
optimizer.apply_gradients(grads)
acc=accuracy(text_snn, target_similarity)
accuracy_vec.append(acc)
loss_vec.append(loss)
print("Accuracy:{} Loss:{}".format(acc,loss))
# Calculate the nearest addresses for test inputs
# First process the test_queries and test_references
test_queries_ix = np.array([address2onehot(x) for x in test_queries])
test_references_ix = np.array([address2onehot(x) for x in test_references])
num_refs = test_references_ix.shape[0]
best_fit_refs = []
for query in test_queries_ix:
test_query = np.repeat(np.array([query]), num_refs, axis=0)
test_out = snn(model,test_query,test_references_ix, 1.0)
best_fit = test_references[np.argmax(test_out)]
best_fit_refs.append(best_fit)
print('Query Addresses: {}'.format(test_queries))
print('Model Found Matches: {}'.format(best_fit_refs))
# Plot the loss and accuracy
plt.plot(loss_vec, 'k-', lw=2, label='Batch Loss')
plt.plot(accuracy_vec, 'r:', label='Batch Accuracy')
plt.xlabel('Iterations')
plt.ylabel('Accuracy and Loss')
plt.title('Accuracy and Loss of Siamese RNN')
plt.grid()
plt.legend(loc='lower right')
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment