Created
March 8, 2016 13:47
-
-
Save cerisara/f71e511d594b8c736d65 to your computer and use it in GitHub Desktop.
Dialogue act recognition Keras model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from keras.preprocessing import sequence | |
from keras.models import Sequential | |
from keras.layers.core import Dense, Dropout, Activation, Flatten, TimeDistributedDense | |
from keras.layers.recurrent import LSTM | |
from keras.layers.embeddings import Embedding | |
from keras.utils import np_utils | |
from keras.preprocessing.text import Tokenizer | |
from keras.models import Graph | |
from keras.utils.np_utils import accuracy | |
import re | |
import copy | |
import sys | |
import globvars | |
if globvars.plotWeights: | |
import matplotlib.pyplot as plt | |
from dasio import * | |
def normalized(a, f=0.5, axis=-1, order=2): | |
l2 = np.atleast_1d(np.linalg.norm(a, order, axis)) | |
l2[l2==0] = 1 | |
b = a / np.expand_dims(l2, axis) | |
return f * b | |
# dont do x-val, but rather take the train/test list from Stolcke | |
for xv in range(1): | |
print("Loading data...") | |
(X_train, Y_train0, Z_train), (X_test, Y_test0, Z_test) = load_data(nb_words=globvars.max_words,xval=xv) | |
if globvars.addPrevInputs: | |
# note: this BOW is also interesting because it includes all words, without truncation. TODO: add such a vector also for current sentence | |
tokenizer = Tokenizer(nb_words=globvars.max_words) | |
PX_tr = tokenizer.sequences_to_matrix(X_train, mode='binary') | |
PX_te = tokenizer.sequences_to_matrix(X_test, mode='binary') | |
PX_tr = np.roll(PX_tr,1,axis=0) | |
PX_tr[0][:]=0 | |
PX_te = np.roll(PX_te,1,axis=0) | |
PX_te[0][:]=0 | |
print("#sentences train: "+str(len(X_train))+" test: "+str(len(X_test))) | |
print("out nb_classes= "+str(globvars.nb_classes)) | |
# here we still have the full sequences | |
if globvars.bigram: | |
BX_tr = copy.deepcopy(X_train) | |
BZ_tr = copy.deepcopy(Z_train) | |
BX_te = copy.deepcopy(X_test) | |
BZ_te = copy.deepcopy(Z_test) | |
# now replace single word with bigram | |
for i in xrange(len(X_train)): | |
BX_tr[i][0]=globvars.voc.get('STARTUTT') | |
BZ_tr[i][0]=globvars.vocpos.get('PADDING') | |
for j in xrange(len(X_train[i])-1): BX_tr[i][j+1]=X_train[i][j] | |
for j in xrange(len(Z_train[i])-1): BZ_tr[i][j+1]=Z_train[i][j] | |
for i in xrange(len(X_test)): | |
BX_te[i][0]=globvars.voc.get('STARTUTT') | |
BZ_te[i][0]=globvars.vocpos.get('PADDING') | |
for j in xrange(len(X_test[i])-1): BX_te[i][j+1]=X_test[i][j] | |
for j in xrange(len(Z_test[i])-1): BZ_te[i][j+1]=Z_test[i][j] | |
BX_tr= sequence.pad_sequences(BX_tr, maxlen=globvars.maxlen, padding='post', truncating='post') | |
BX_te= sequence.pad_sequences(BX_te, maxlen=globvars.maxlen, padding='post', truncating='post') | |
BZ_tr= sequence.pad_sequences(BZ_tr, maxlen=globvars.maxlen, padding='post', truncating='post') | |
BZ_te= sequence.pad_sequences(BZ_te, maxlen=globvars.maxlen, padding='post', truncating='post') | |
X_train = sequence.pad_sequences(X_train, maxlen=globvars.maxlen, padding='post', truncating='post') | |
X_test = sequence.pad_sequences(X_test, maxlen=globvars.maxlen, padding='post', truncating='post') | |
Z_test = sequence.pad_sequences(Z_test, maxlen=globvars.maxlen, padding='post', truncating='post') | |
Z_train = sequence.pad_sequences(Z_train, maxlen=globvars.maxlen, padding='post', truncating='post') | |
if True: | |
Y_train = np.zeros((len(X_train),globvars.nb_classes))#,dtype=np.float32) | |
nw=0 | |
co={} | |
for t in range(globvars.nb_classes): | |
co[t]=0 | |
for t in range(len(X_train)): | |
Y_train[t][Y_train0[t]]=1 | |
co[Y_train0[t]]=co.get(Y_train0[t])+1 | |
print("counts labels: "+str(co)) | |
Y_test = np.zeros((len(Y_test0),globvars.nb_classes))#,dtype=np.float32) | |
for t in range(len(Y_test)): | |
Y_test[t][Y_test0[t]]=1 | |
# optional: add noise to the words to increase corpus size | |
ncopy=0 | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(X_train) | |
for i in xrange(len(tmpdbl)): tmpdbl[i][np.random.randint(0,globvars.maxlen)]=np.random.randint(0,len(globvars.voc)) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(X_train,) | |
X_train=np.concatenate(arrs,axis=0) | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(Z_train) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(Z_train,) | |
Z_train=np.concatenate(arrs,axis=0) | |
if globvars.bigram: | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(BX_tr) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(BX_tr,) | |
BX_tr=np.concatenate(arrs,axis=0) | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(BZ_tr) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(BZ_tr,) | |
BZ_tr=np.concatenate(arrs,axis=0) | |
if globvars.addPrevInputs: | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(PX_tr) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(PX_tr,) | |
PX_tr=np.concatenate(arrs,axis=0) | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(Y_train) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(Y_train,) | |
Y_train = np.concatenate(arrs,axis=0) | |
arrs = () | |
for j in xrange(ncopy): | |
tmpdbl = np.copy(Y_train0) | |
arrs=arrs+(tmpdbl,) | |
arrs=arrs+(Y_train0,) | |
Y_train0 = np.concatenate(arrs,axis=0) | |
model = Graph() | |
model.add_input(name='input', input_shape=(globvars.maxlen,), dtype=int) | |
model.add_input(name='inputpos', input_shape=(globvars.maxlen,), dtype=int) | |
wembed = Embedding(globvars.max_words, globvars.embedsize, input_length=globvars.maxlen, trainable=globvars.trainEmbed) | |
pembed = Embedding(len(globvars.vocpos), len(globvars.vocpos)*3/4, input_length=globvars.maxlen, trainable=globvars.trainEmbed) | |
model.add_node(wembed, name='embed', input='input') | |
model.add_node(pembed, name='embedpos', input='inputpos') | |
if globvars.bigram: | |
model.add_input(name='binput', input_shape=(globvars.maxlen,), dtype=int) | |
model.add_input(name='binputpos', input_shape=(globvars.maxlen,), dtype=int) | |
bwembed = Embedding(globvars.max_words, globvars.embedsize, input_length=globvars.maxlen, trainable=globvars.trainEmbed) | |
bpembed = Embedding(len(globvars.vocpos), len(globvars.vocpos)*3/4, input_length=globvars.maxlen, trainable=globvars.trainEmbed) | |
model.add_node(bwembed, name='bembed', input='binput') | |
model.add_node(bpembed, name='bembedpos', input='binputpos') | |
hidlay = TimeDistributedDense(globvars.nhids[0], activation='tanh',trainable=True) | |
model.add_node(hidlay, name='dense1', inputs=['embed','embedpos','bembed','bembedpos']) | |
out = 'dense1' | |
for idx, n in enumerate(globvars.nhids[1:]): | |
i=idx+2 | |
out = 'dense'+str(i) | |
model.add_node(TimeDistributedDense(n, activation='tanh'), name=out, input='dense'+str(i-1)) | |
model.add_node(LSTM(globvars.hidden,return_sequences=False,go_backwards=True), name='lstm', input=out) | |
else: | |
model.add_node(LSTM(globvars.hidden,return_sequences=False,go_backwards=True), name='lstm', inputs=['embed','embedpos']) | |
model.add_node(Dropout(globvars.dropout), name='dropout', input='lstm') | |
if globvars.addPrevInputs: | |
model.add_input(name='previnput', input_shape=(globvars.max_words,)) | |
model.add_node(Dense(globvars.nfin), name='merge', inputs=['dropout','previnput']) | |
model.add_node(Activation('tanh'), name='mergeact', input='merge') | |
model.add_node(Dense(globvars.nb_classes), name='dense', input='mergeact') | |
else: | |
model.add_node(Dense(globvars.nb_classes), name='dense', input='dropout') | |
model.add_node(Activation('softmax'), name='softmax', input='dense') | |
model.add_output(name='output', input='softmax') | |
model.compile('adam', {'output': 'categorical_crossentropy'}) | |
if globvars.initembed: | |
print("load init embeddings") | |
mots,wemb = load_embeddings() | |
nset=0 | |
for i in xrange(len(wemb)): | |
wi = globvars.voc.get(mots[i]) | |
if wi: | |
ws[0][wi][:]=wemb[i][:] | |
nset=nset+1 | |
wembed.set_weights(ws) | |
if globvars.bigram: | |
# just to be safe, I don't use np.copy ... ? | |
bws = bwembed.get_weights() | |
for i in xrange(len(wemb)): | |
wi = globvars.voc.get(mots[i]) | |
if wi: | |
bws[0][wi][:]=wemb[i][:] | |
bwembed.set_weights(bws) | |
print("loaded and set nembeddings "+str(nset)) | |
# normalisation of weights: | |
if globvars.normweights: | |
ws[0]=normalized(ws[0]) # [0] or not ?? | |
wembed.set_weights(ws) | |
printWeightsNorm(ws) | |
if globvars.bigram: | |
bws = bwembed.get_weights() | |
bws[0]=normalized(bws[0]) | |
wembed.set_weights(bws) | |
# test one by one; more flexible, but could be much faster by batch | |
rec = np.zeros((len(X_test),globvars.nb_classes)) | |
# dont need to reallocate these arrays for every sentence, because all sentences have the same length | |
x = np.zeros((1,len(X_test[0]))) | |
z = np.zeros((1,len(X_test[0]))) | |
bx = np.zeros((1,len(X_test[0]))) | |
bz = np.zeros((1,len(X_test[0]))) | |
px = np.zeros((1,len(PX_te[0]))) # idem: PX_te[*] are of length = vocab size | |
for i in xrange(len(X_test)): | |
x[0][:] = X_test[i][:] | |
z[0][:] = Z_test[i][:] | |
ar={'input':x,'inputpos':z} | |
if globvars.addPrevInputs: | |
px[0][:] = PX_te[i][:] | |
ar['previnput']=px | |
if globvars.bigram: | |
bx[0][:] = BX_te[i][:] | |
bz[0][:] = BZ_te[i][:] | |
ar['binput']=bx | |
ar['binputpos']=bz | |
rec[i] = model.predict(ar).get('output')[0] | |
nok=0 | |
for i in range(len(X_test)): | |
if Y_test0[i]==rec[i].argmax(): | |
nok=nok+1 | |
print("test init "+str(nok)+" "+str(len(X_test))+" "+str(float(nok)/float(len(X_test)))) | |
print("conf matrix") | |
ref=np.zeros((globvars.nb_classes,globvars.nb_classes),dtype=np.int32) | |
for z in range(len(rec)): | |
ref[Y_test0[z],rec[z].argmax()]=ref[Y_test0[z],rec[z].argmax()]+1 | |
for z in range(globvars.nb_classes): | |
print("classGold "+str(z)+": "+str(ref[z].sum())+" post: "+str(ref[:,z].sum())) | |
for epo in xrange(globvars.nb_epoch): | |
ar={'input':X_train,'inputpos':Z_train,'output':Y_train} | |
if globvars.addPrevInputs: ar['previnput']=PX_tr | |
if globvars.bigram: | |
ar['binput']=BX_tr | |
ar['binputpos']=BZ_tr | |
model.fit(ar, batch_size=globvars.batch_size, nb_epoch=1, shuffle=globvars.shuf, verbose=1) | |
# EVAL sur le train | |
ar={'input':X_train,'inputpos':Z_train} | |
if globvars.addPrevInputs: ar['previnput']=PX_tr | |
if globvars.bigram: | |
ar['binput']=BX_tr | |
ar['binputpos']=BZ_tr | |
if globvars.addPrevClass: ar['cheat']=PY_train | |
rec = model.predict(ar).get('output') | |
nok=0 | |
for i in range(len(X_train)): | |
if Y_train0[i]==rec[i].argmax(): | |
nok=nok+1 | |
print("train epoch "+str(epo)+" "+str(nok)+" "+str(len(X_train))+" "+str(float(nok)/float(len(X_train)))) | |
# EVAL sur le test | |
rec = np.zeros((len(X_test),globvars.nb_classes)) | |
x = np.zeros((1,len(X_test[0]))) | |
z = np.zeros((1,len(X_test[0]))) | |
bx = np.zeros((1,len(X_test[0]))) | |
bz = np.zeros((1,len(X_test[0]))) | |
px = np.zeros((1,len(PX_te[0]))) # idem: PX_te[*] are of length = vocab size | |
for i in xrange(len(X_test)): | |
x[0][:] = X_test[i][:] | |
z[0][:] = Z_test[i][:] | |
ar={'input':x,'inputpos':z} | |
if globvars.addPrevInputs: | |
px[0][:] = PX_te[i][:] | |
ar['previnput']=px | |
if globvars.bigram: | |
bx[0][:] = BX_te[i][:] | |
bz[0][:] = BZ_te[i][:] | |
ar['binput']=bx | |
ar['binputpos']=bz | |
recr = model.predict(ar).get('output') | |
rec[i] = recr[0] | |
nok=0 | |
for i in range(len(X_test)): | |
if Y_test0[i]==rec[i].argmax(): | |
nok=nok+1 | |
print("test epoch "+str(epo)+" "+str(nok)+" "+str(len(X_test))+" "+str(float(nok)/float(len(X_test)))) | |
np.save("lstm",model.get_weights()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import globvars | |
import re | |
def load_embeddings(): | |
w2vfile = "voc.w2v" | |
print("read w2v embeddings") | |
f = open(w2vfile,'rb') | |
nw=0 | |
while True: | |
s=f.readline() | |
if not s: | |
break | |
nw=nw+1 | |
f.close() | |
wemb = np.zeros((nw,300)) | |
mots = [] | |
f = open(w2vfile,'rb') | |
for w in xrange(nw): | |
s=f.readline() | |
s=s.strip() | |
cols=s.split() | |
mots.append(cols[0]) | |
for i in xrange(1,len(cols)): | |
wemb[w][i-1]=float(cols[i]) | |
f.close() | |
return (mots,wemb) | |
def load_data(nb_words=1000,seed=123,xval=0): | |
'''load a conll corpus and outputs a list (per sentence) of list (per word) of word indices | |
''' | |
# Note that the following code assumes the existence of a column for "POS-tags" | |
# but in our experiments, this column always contains the same uninformative POSTAG: "WORD" | |
# This column is kept just in case POS-tags would be required in future experiments | |
corpus = "train.conll" | |
testfile = "test.conll" | |
vocfile = "voc.txt" | |
# the .voc file contains the voc obtained with | |
# cut -f1 das.conll | sort | uniq -c | sort -nr > das.voc | |
# plus PADDING and UNKNOWN | |
print("read the voc "+vocfile+" "+str(nb_words)) | |
f = open(vocfile,'rb') | |
for i in range(nb_words): | |
s=f.readline() | |
if not s: | |
break | |
s = s.strip() | |
globvars.voc[s]=i | |
f.close() | |
globvars.padd=globvars.voc.get('PADDING') | |
print("padd in load "+str(globvars.padd)) | |
print("read the train conll "+str(len(globvars.voc))) | |
X,Y,Z = [], [], [] | |
f = open(corpus,'rb') | |
motsinutt, postinutt = [], [] | |
nl,nutt=0,0 | |
# sequence.pad will later add 0 for padding, so reserve this postag index | |
globvars.vocpos['PADDING']=0 | |
while (True): | |
s=f.readline() | |
if not s: break | |
s=s.strip() | |
cols=s.split() | |
nl=nl+1 | |
if len(cols)>0: | |
post=globvars.vocpos.get(cols[1]) | |
if post==None: | |
post=len(globvars.vocpos) | |
globvars.vocpos[cols[1]]=post | |
postinutt.append(post) | |
lidx=globvars.voclab.get(cols[2]) | |
if lidx==None: | |
lidx=len(globvars.voclab) | |
globvars.voclab[cols[2]]=lidx | |
widx=globvars.voc.get(cols[0]) | |
if widx==None: | |
widx=globvars.voc.get('UNKNOWN') | |
motsinutt.append(widx) | |
else: | |
nutt=nutt+1 | |
X.append(motsinutt) | |
Y.append(lidx) | |
Z.append(postinutt) | |
motsinutt, postinutt = [], [] | |
f.close() | |
print("nvoclab "+str(len(globvars.voclab))) | |
print("nb of examples in corpus= "+str(len(X))+" "+str(len(Y))+" "+str(nl)+" "+str(nutt)) | |
print("read the test conll") | |
Xtest,Ytest,Ztest = [], [], [] | |
f = open(testfile,'rb') | |
motsinutt,postinutt=[],[] | |
while (True): | |
s=f.readline() | |
if (not s): | |
break | |
cols=s.split() | |
if (len(cols)>0): | |
post=globvars.vocpos.get(cols[1]) | |
if post==None: | |
post=len(globvars.vocpos) | |
globvars.vocpos[cols[1]]=post | |
postinutt.append(post) | |
lidx=globvars.voclab.get(cols[2]) | |
if lidx==None: | |
lidx=len(globvars.voclab) | |
globvars.voclab[cols[2]]=lidx | |
widx=globvars.voc.get(cols[0]) | |
if widx==None: | |
widx=globvars.voc.get('UNKNOWN') | |
motsinutt.append(widx) | |
else: | |
Xtest.append(motsinutt) | |
Ytest.append(lidx) | |
Ztest.append(postinutt) | |
motsinutt,postinutt=[],[] | |
f.close() | |
print("nb of examples in corpus= "+str(len(Xtest))+" "+str(len(Ytest))) | |
globvars.nb_classes = len(globvars.voclab) | |
print("inner nb classes= "+str(globvars.nb_classes)+" "+str(max(globvars.voclab.values()))) | |
print("classes: "+str(list(globvars.voclab.values()).sort())) | |
print("data loaded: input voc layer= "+str(len(globvars.voc))+" "+str(len(globvars.vocpos))+" output labels layer= "+str(len(globvars.voclab))) | |
print(globvars.voclab) | |
print(globvars.vocpos) | |
# np.random.seed(seed) | |
# np.random.shuffle(X) | |
# np.random.seed(seed) | |
# np.random.shuffle(Y) | |
# np.random.seed(seed) | |
# np.random.shuffle(Z) | |
return (X, Y, Z), (Xtest, Ytest, Ztest) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
maxlen = 15 # cut sentences after maxlen words | |
max_words = 1000 | |
# TODO check whether this batch_size has been tuned for GPU computation ? | |
batch_size = 2048 | |
embedsize = 300 | |
padd = -1 # to be set in load_data | |
# POStags are supported but not used in our experiments | |
voc, voclab, vocpos = {}, {}, {} | |
nb_classes = -1 | |
dropout = 0.5 | |
hidden=50 | |
nfin = 200 | |
# not used if bigram=False: | |
nhids = [300] | |
nb_epoch = 30 | |
trainEmbed=True | |
shuf=True | |
bigram=False | |
addPrevInputs = True | |
# must be false if bigram=False: | |
riskmin = False | |
normweights=False | |
initembed=False | |
plotWeights=False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment