Created
August 11, 2016 05:32
-
-
Save dirko/1d596ca757a541da96ac3caa6f291229 to your computer and use it in GitHub Desktop.
Keras bidirectional LSTM NER tagger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Keras==1.0.6 | |
from keras.models import Sequential | |
import numpy as np | |
from keras.layers.recurrent import LSTM | |
from keras.layers.core import TimeDistributedDense, Activation | |
from keras.preprocessing.sequence import pad_sequences | |
from keras.layers.embeddings import Embedding | |
from sklearn.cross_validation import train_test_split | |
from keras.layers import Merge | |
from keras.backend import tf | |
from lambdawithmask import Lambda as MaskLambda | |
from sklearn.metrics import confusion_matrix, accuracy_score | |
raw = open('wikigold.conll.txt', 'r').readlines() | |
all_x = [] | |
point = [] | |
for line in raw: | |
stripped_line = line.strip().split(' ') | |
point.append(stripped_line) | |
if line == '\n': | |
all_x.append(point[:-1]) | |
point = [] | |
all_x = all_x[:-1] | |
lengths = [len(x) for x in all_x] | |
short_x = [x for x in all_x if len(x) < 64] | |
X = [[c[0] for c in x] for x in short_x] | |
y = [[c[1] for c in y] for y in short_x] | |
all_text = [c for x in X for c in x] | |
words = list(set(all_text)) | |
word2ind = {word: index for index, word in enumerate(words)} | |
ind2word = {index: word for index, word in enumerate(words)} | |
labels = list(set([c for x in y for c in x])) | |
label2ind = {label: (index + 1) for index, label in enumerate(labels)} | |
ind2label = {(index + 1): label for index, label in enumerate(labels)} | |
print 'Input sequence length range: ', max(lengths), min(lengths) | |
maxlen = max([len(x) for x in X]) | |
print 'Maximum sequence length:', maxlen | |
def encode(x, n): | |
result = np.zeros(n) | |
result[x] = 1 | |
return result | |
X_enc = [[word2ind[c] for c in x] for x in X] | |
X_enc_reverse = [[c for c in reversed(x)] for x in X_enc] | |
max_label = max(label2ind.values()) + 1 | |
y_enc = [[0] * (maxlen - len(ey)) + [label2ind[c] for c in ey] for ey in y] | |
y_enc = [[encode(c, max_label) for c in ey] for ey in y_enc] | |
X_enc_f = pad_sequences(X_enc, maxlen=maxlen) | |
X_enc_b = pad_sequences(X_enc_reverse, maxlen=maxlen) | |
y_enc = pad_sequences(y_enc, maxlen=maxlen) | |
(X_train_f, X_test_f, X_train_b, | |
X_test_b, y_train, y_test) = train_test_split(X_enc_f, X_enc_b, y_enc, | |
test_size=11*32, train_size=45*32, random_state=42) | |
print 'Training and testing tensor shapes:' | |
print X_train_f.shape, X_test_f.shape, X_train_b.shape, X_test_b.shape, y_train.shape, y_test.shape | |
max_features = len(word2ind) | |
embedding_size = 128 | |
hidden_size = 32 | |
out_size = len(label2ind) + 1 | |
def reverse_func(x, mask=None): | |
return tf.reverse(x, [False, True, False]) | |
model_forward = Sequential() | |
model_forward.add(Embedding(max_features, embedding_size, input_length=maxlen, mask_zero=True)) | |
model_forward.add(LSTM(hidden_size, return_sequences=True)) | |
model_backward = Sequential() | |
model_backward.add(Embedding(max_features, embedding_size, input_length=maxlen, mask_zero=True)) | |
model_backward.add(LSTM(hidden_size, return_sequences=True)) | |
model_backward.add(MaskLambda(function=reverse_func, mask_function=reverse_func)) | |
model = Sequential() | |
model.add(Merge([model_forward, model_backward], mode='concat')) | |
model.add(TimeDistributedDense(out_size)) | |
model.add(Activation('softmax')) | |
model.compile(loss='categorical_crossentropy', optimizer='adam') | |
batch_size = 32 | |
model.fit([X_train_f, X_train_b], y_train, batch_size=batch_size, nb_epoch=40, | |
validation_data=([X_test_f, X_test_b], y_test)) | |
score = model.evaluate([X_test_f, X_test_b], y_test, batch_size=batch_size) | |
print('Raw test score:', score) | |
def score(yh, pr): | |
coords = [np.where(yhh > 0)[0][0] for yhh in yh] | |
yh = [yhh[co:] for yhh, co in zip(yh, coords)] | |
ypr = [prr[co:] for prr, co in zip(pr, coords)] | |
fyh = [c for row in yh for c in row] | |
fpr = [c for row in ypr for c in row] | |
return fyh, fpr | |
pr = model.predict_classes([X_train_f, X_train_b]) | |
yh = y_train.argmax(2) | |
fyh, fpr = score(yh, pr) | |
print 'Training accuracy:', accuracy_score(fyh, fpr) | |
print 'Training confusion matrix:' | |
print confusion_matrix(fyh, fpr) | |
pr = model.predict_classes([X_test_f, X_test_b]) | |
yh = y_test.argmax(2) | |
fyh, fpr = score(yh, pr) | |
print 'Testing accuracy:', accuracy_score(fyh, fpr) | |
print 'Testing confusion matrix:' | |
print confusion_matrix(fyh, fpr) |
Hello, I am trying to train my NER model and I found an example script on how to do it with Keras http://pythonexample.com/code/keras%20unidirectional%20tagger/
I am surprise do see that there are no comments in the code, so I did some google search and I found the same think with your git repo.
Would you please suggest a tutorial where the code is well explained?
Thank you.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you for sharing. Amazing code.
I am a beginner in RNN and LSTM. My question might be very basic. Why are we including X_enc_reverse( or X_train_b) in the model.
And can you please suggest a decent documentation to read about it