Skip to content

Instantly share code, notes, and snippets.

@cbaziotis
Last active October 22, 2024 08:31
Show Gist options
  • Save cbaziotis/6428df359af27d58078ca5ed9792bd6d to your computer and use it in GitHub Desktop.
Save cbaziotis/6428df359af27d58078ca5ed9792bd6d to your computer and use it in GitHub Desktop.
Keras Layer that implements an Attention mechanism for temporal data. Supports Masking. Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
from keras import backend as K, initializers, regularizers, constraints
from keras.engine.topology import Layer
def dot_product(x, kernel):
"""
Wrapper for dot product operation, in order to be compatible with both
Theano and Tensorflow
Args:
x (): input
kernel (): weights
Returns:
"""
if K.backend() == 'tensorflow':
# todo: check that this is correct
return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
else:
return K.dot(x, kernel)
class Attention(Layer):
def __init__(self,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True,
return_attention=False,
**kwargs):
"""
Keras Layer that implements an Attention mechanism for temporal data.
Supports Masking.
Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
# Input shape
3D tensor with shape: `(samples, steps, features)`.
# Output shape
2D tensor with shape: `(samples, features)`.
:param kwargs:
Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
The dimensions are inferred based on the output shape of the RNN.
Note: The layer has been tested with Keras 1.x
Example:
# 1
model.add(LSTM(64, return_sequences=True))
model.add(Attention())
# next add a Dense layer (for classification/regression) or whatever...
# 2 - Get the attention scores
hidden = LSTM(64, return_sequences=True)(words)
sentence, word_scores = Attention(return_attention=True)(hidden)
"""
self.supports_masking = True
self.return_attention = return_attention
self.init = initializers.get('glorot_uniform')
self.W_regularizer = regularizers.get(W_regularizer)
self.b_regularizer = regularizers.get(b_regularizer)
self.W_constraint = constraints.get(W_constraint)
self.b_constraint = constraints.get(b_constraint)
self.bias = bias
super(Attention, self).__init__(**kwargs)
def build(self, input_shape):
assert len(input_shape) == 3
self.W = self.add_weight((input_shape[-1],),
initializer=self.init,
name='{}_W'.format(self.name),
regularizer=self.W_regularizer,
constraint=self.W_constraint)
if self.bias:
self.b = self.add_weight((input_shape[1],),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)
else:
self.b = None
self.built = True
def compute_mask(self, input, input_mask=None):
# do not pass the mask to the next layers
return None
def call(self, x, mask=None):
eij = dot_product(x, self.W)
if self.bias:
eij += self.b
eij = K.tanh(eij)
a = K.exp(eij)
# apply mask after the exp. will be re-normalized next
if mask is not None:
# Cast the mask to floatX to avoid float64 upcasting in theano
a *= K.cast(mask, K.floatx())
# in some cases especially in the early stages of training the sum may be almost zero
# and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
# a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())
weighted_input = x * K.expand_dims(a)
result = K.sum(weighted_input, axis=1)
if self.return_attention:
return [result, a]
return result
def compute_output_shape(self, input_shape):
if self.return_attention:
return [(input_shape[0], input_shape[-1]),
(input_shape[0], input_shape[1])]
else:
return input_shape[0], input_shape[-1]
@karthik1290
Copy link

@cdj0311
Copy link

cdj0311 commented Jul 7, 2017

I get an error: IndexError: pop index out of range with tensorflow backend, how to solve it?

@hardikmeisheri
Copy link

If you are on keras 2.0.4 then change
initializations to initializers
and
get_output_shape_for to compute_output_shape

@xiaoleihuang
Copy link

@cdj0311
You need to switch to Theano as backend.

@cbaziotis
Copy link
Author

cbaziotis commented Jul 29, 2017

@cdj0311 this has to do with Tensorflow.
The new version of the gist must work also with Tensorflow.

Edit: Also, sorry for not replying sooner, but notifications for gist comments apparently don't work.
isaacs/github#21

@prernakhurana2
Copy link

Getting error
TypeError: rank mismatch between coding and true distributions
in theano as backend

@v1nc3nt27
Copy link

v1nc3nt27 commented Sep 12, 2017

This works for me on TF 1.0.1 and Keras 2.0.6, thank you. Did someone test this and tried using/dropping the bias? For me, it doesn't change the results at all. If I initialize the bias with e.g. glorot uniform, the result changes. It seems the bias is not trained and stays all 0's. Any ideas why this might be happening?

@cbaziotis
Copy link
Author

Updated for Keras 2.

@DeliaX
Copy link

DeliaX commented Sep 20, 2017

If I want to stack more than one LSTM layers into one model, shall I add attention layer to each of the LSTM layer? If yes, how can I modify this code if I want to put another LSTM layer on the top of this attention layer... Thank you! @cbaziotis

@DeliaX
Copy link

DeliaX commented Sep 20, 2017

In order to realize the above-mentioned function, I have tried to modify this code by adding an attribute 'return_sequences = False' and rewriting the 'init', 'call', 'compute_mask' and 'compute_output_shape' functions of the original attention layer class, but I am not sure whether the modifications are right or not... The modified codes are as follows:

```
class AttLayer(Layer):
def __init__(self,
             W_regularizer=None, b_regularizer=None,
             W_constraint=None, b_constraint=None,
             bias=True,
             return_sequences = False,**kwargs):
    self.return_sequences = return_sequences
    ……
 def compute_mask(self, input, input_mask=None):
     output_mask = input_mask if self.return_sequences else None
    # do not pass the mask to the next layers
    return output_mask

def call(self, x, mask=None):
    eij = dot_product(x, self.W)

    if self.bias:
        eij += self.b

    eij = K.tanh(eij)

    a = K.exp(eij)

    # apply mask after the exp. will be re-normalized next
    if mask is not None:
        # Cast the mask to floatX to avoid float64 upcasting in theano
        a *= K.cast(mask, K.floatx())

    # in some cases especially in the early stages of training the sum may be almost zero
    # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
    # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
    a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

    a = K.expand_dims(a)
    weighted_input = x * a
    if self.return_sequences:            
        return weighted_input
    else:
        return K.sum(weighted_input, axis=1)

def compute_output_shape(self, input_shape):
    if self.return_sequences:
        return input_shape[0], input_shape[1],input_shape[-1]
    else:
        return input_shape[0],input_shape[-1]

@DeliaX
Copy link

DeliaX commented Oct 7, 2017

Anybody can help to answer my questions? Thank you so much...

@WellDone2094
Copy link

I think you should apply attention only to the outputs of the last layer

@WladimirSidorenko
Copy link

@cbaziotis

Hello Christos,

Two quick questions:

  1. Does the bias term work for you with unbound input lengths? Because if I don't restrict the lengths of the input sequences in the very first layer of the model, shape[1] becomes None, and the initializer in build() throws an error:
Traceback (most recent call last):
  File "/home/sidorenko/Projects/CGSA/venv/bin/cgsa_sentiment", line 6, in <module>
    exec(compile(open(__file__).read(), __file__, 'exec'))
  File "/home/sidorenko/Projects/CGSA/scripts/cgsa_sentiment", line 185, in <module>
    main(sys.argv[1:])
  File "/home/sidorenko/Projects/CGSA/scripts/cgsa_sentiment", line 163, in main
    a_grid_search=args.grid_search
  File "/home/sidorenko/Projects/CGSA/cgsa/cgsa.py", line 166, in train
    a_grid_search=a_grid_search)
  File "/home/sidorenko/Projects/CGSA/cgsa/dl/base.py", line 117, in train
    self._init_nn()
  File "/home/sidorenko/Projects/CGSA/cgsa/dl/baziotis.py", line 63, in _init_nn
    self._model.add(Attention(bias=True))
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/keras/models.py", line 469, in add
    output_tensor = layer(self.outputs[0])
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/keras/engine/topology.py", line 572, in __call__
    self.build(input_shapes[0])
  File "/home/sidorenko/Projects/CGSA/cgsa/dl/layers/attention.py", line 117, in build
    constraint=self.b_constraint)
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/keras/legacy/interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/keras/engine/topology.py", line 393, in add_weight
    print("initializer(shape): ", repr(initializer(shape)))
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/keras/initializers.py", line 29, in __call__
    return K.constant(0, shape=shape, dtype=dtype)
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/keras/backend/theano_backend.py", line 159, in constant
    np_value = value * np.ones(shape)
  File "/home/sidorenko/Projects/CGSA/venv/lib/python3.5/site-packages/numpy/core/numeric.py", line 192, in ones
    a = empty(shape, dtype, order)
TypeError: 'NoneType' object cannot be interpreted as an integer
  1. is self.W meant to be a vector? because this notation is typically reserved for matrices, but judging by the code it appears to have only one dimension.

@jiexiongseeker
Copy link

jiexiongseeker commented Feb 12, 2018

Hi, Thanks for your implementation. However, comparing to the original paper, your code on "bias" is implemented differently.

@gregkoytiger
Copy link

gregkoytiger commented Apr 30, 2018

To fully support saving / loading, I believe the Attention layer requires the following:

    def get_config(self):
        config = {
            'return_attention': self.return_attention,
            'W_regularizer': regularizers.serialize(self.W_regularizer),
            'b_regularizer': regularizers.serialize(self.b_regularizer),
            'W_constraint': constraints.serialize(self.W_constraint),
            'b_constraint': constraints.serialize(self.b_constraint),
            'bias': self.bias
        }
   
        base_config = super(Attention, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

@ronggong
Copy link

ronggong commented May 30, 2018

https://gist.github.com/cbaziotis/6428df359af27d58078ca5ed9792bd6d#gistcomment-2343639

@WladimirSidorenko have you found the solution for variable length input? I think the problem is at the bias, if you set bias=False, it will be fine.

Update: I don't understand why the bias vector is of size the second dimension of the input shape, which is the time steps...
The output of eij = dot_product(x, self.W) should have dimension (samples, steps), I doubt if we need to learn the bias for each time steps. Should we just need to set the bias as:

self.b = self.add_weight((1,),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)

@ant1pink
Copy link

@cbaziotis
hidden = LSTM(64, return_sequences=True)(words)
sentence, word_scores = Attention(return_attention=True)(hidden)
output = Dense(1, activation='sigmoid')(sentence)
in this case, when I train it with a binary classification problem. How do I catch 'word_scores'?
When I do this:
attention_model = Model(input= model.input, output= model.layers[-2].output)
I got the 'sentence' rather than 'word_scores '

@barbarasilveiraf
Copy link

@ant1pink,

Did you get the "word_scores"?

Thanks.

@EricRiveraLopez
Copy link

Hello! I use your code. But I have a problem when load a model with Attetion layer. This problem is:

ValueError: Unknown layer: AttentionDecoder

Previously, I training a Neuronal Network R LSTM

adam=keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
#aquí se empieza a construir la gráfica de la red
model=Sequential()
#se empieza a ñadir capas al modelo
model.add(LSTM(output_dim=300,
input_shape=x_train.shape[1:],
return_sequences=True,
activation='hard_sigmoid',
dropout=0.2))
#segunda capa
model.add(LSTM(output_dim=300,
input_shape=x_train.shape[1:],
return_sequences=True,
activation='hard_sigmoid',
dropout=0.2))
model.add(AttentionDecoder(300, 300))
model.compile(loss='mean_squared_error', optimizer=adam, metrics=['mean_absolute_percentage_error'])
model.summary()

May somebody help me?

@williamgilpin
Copy link

Hello, I tried using the current version, and I kept getting this error:

TypeError: add_weight() got multiple values for argument 'name'

It turns out that this results from using eager execution in the latest version of tensorflow. The solution was to modify the argument of the add_weights function, such that the first argument is explicitly named shape


    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 #shape=(input_shape[-1], input_shape[1]),
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight(shape=(input_shape[1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     #shape=(input_shape[-1],),
                                     constraint=self.b_constraint)
        else:
            self.b = None

        self.built = True

@modestprophet
Copy link

Some folks in this thread asked about extracting the attention vector during inference. I believe I finally got that bit of functionality to work and have described the process here: https://stackoverflow.com/a/59276694/11133810

@rimchiha-fseg
Copy link

rimchiha-fseg commented Apr 28, 2020

i work on named entity recognition domain
i tried to implement the attention layer proposed in

https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-019-0933-6
the code of attention layer
`from keras.engine.topology import Layer
from keras import backend as K, initializers, regularizers, constraints
def dot_product(x, kernel):

    if K.backend() == 'tensorflow':
        # todo: check that this is correct
        return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
    else:
        return K.dot(x, kernel)

class Attention(Layer):
def init(self,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True,return_attention=False, **kwargs):

    self.supports_masking = True
    self.init = initializers.get('glorot_uniform')

    self.W_regularizer = regularizers.get(W_regularizer)
    self.b_regularizer = regularizers.get(b_regularizer)

    self.W_constraint = constraints.get(W_constraint)
    self.b_constraint = constraints.get(b_constraint)

    self.bias = bias
    self.return_attention = return_attention
    super(Attention, self).__init__(**kwargs)

def build(self, input_shape):
    assert len(input_shape) == 3
    print()
    self.W = self.add_weight(shape=(input_shape[-1],),
                             initializer=self.init,
                             name='{}_W'.format(self.name),
                             regularizer=self.W_regularizer,
                             #shape=(input_shape[-1], input_shape[1]),
                             constraint=self.W_constraint)
    if self.bias:
        self.b = self.add_weight(shape=(input_shape[1],),
                                 initializer='zero',
                                 name='{}_b'.format(self.name),
                                 regularizer=self.b_regularizer,
                                 #shape=(input_shape[-1],),
                                 constraint=self.b_constraint)
    else:
        self.b = None

    self.built = True


def compute_mask(self, input, input_mask=None):
    # do not pass the mask to the next layers
    return None

def call(self, x, mask=None):
    eij = dot_product(x, self.W)
    print("x:",x)
    print("intiale eij", eij)
    if self.bias:
        eij += self.b
    print("first eij:", eij)
    eij = K.tanh(eij)
    print("eij:", eij)
    a = K.exp(eij)

    # apply mask after the exp. will be re-normalized next
    if mask is not None:
        # Cast the mask to floatX to avoid float64 upcasting in theano
        a *= K.cast(mask, K.floatx())

    # in some cases especially in the early stages of training the sum may be almost zero
    # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
    # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
    a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

    a = K.expand_dims(a)
    print("alpha", a.shape)
    print(K.expand_dims(a))
    #weighted_input =dot_product(x,a)
    c=K.sum(x * K.expand_dims(a), axis=1)
    print("global vector", c.shape)
    new_output = tf.concat([x,c], axis=2)
    print("new_output", new_output.shape)
    #z=K.tanh(new_output)
   
    #print(z.shape)
    #return K.sum(weighted_input, axis=1)
    return new_output
`

the model is

from keras.models import Sequential from keras import backend as K from keras.models import Model from keras.optimizers import Adam from keras import initializers import numpy as np from keras.layers import Dense, Input, TimeDistributed, Embedding, Activation, Bidirectional return_attention = True inp1=Input(shape=(MAX_LENGTH,)) emb1=Embedding(len(word2index), 128)(inp1) bilstm2=Bidirectional(LSTM(256, return_sequences=True))(emb1) x=Attention(return_attention=True)(bilstm2) dense2=TimeDistributed(Dense(len(tag2index_U)))(x) out2=Activation('softmax')(dense2) model = Model(inputs=inp1, outputs= out2) model.compile(loss='categorical_crossentropy', optimizer=Adam(0.001),metrics=['accuracy']) model.summary()

model

the fit and evaluate run correctly with batch_size=1
model.fit(train_sentences_X, train_sentences_Y ,batch_size=1, epochs=20)
score = model.evaluate(test_sentences_X, train_sentences_Y , batch_size=1 )
but the predict
test_samples=i love paris the result should be O O B-LOC
predictions = model.predict(test_samples_X, batch_size=1, verbose=1)
return the following error
`~\Anaconda3\lib\site-packages\keras\engine\training.py in predict(self, x, batch_size, verbose, steps, callbacks, max_queue_size, workers, use_multiprocessing)
1460 verbose=verbose,
1461 steps=steps,
-> 1462 callbacks=callbacks)
1463
1464 def train_on_batch(self, x, y,

~\Anaconda3\lib\site-packages\keras\engine\training_arrays.py in predict_loop(model, f, ins, batch_size, verbose, steps, callbacks)
330 outs.append(np.zeros(shape, dtype=batch_out.dtype))
331 for i, batch_out in enumerate(batch_outs):
--> 332 outs[i][batch_start:batch_end] = batch_out
333
334 batch_logs['outputs'] = batch_outs

ValueError: could not broadcast input array from shape (2,75,14) into shape (1,75,14)
`

@anoopkdcs
Copy link

Hai,
How to change the attention code to get - an attention distribution is frozen to uniform weights.

@visheshaylani
Copy link

image
I am getting this error while load the model. How can it solve it? Please help.

@sofimukhtar
Copy link

image
I am getting this error while load the model. How can it solve it? Please help.

replace W-regularizer by Kernel_regularizer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment