Last active
August 29, 2015 14:13
-
-
Save skaae/98d751b9c2e17c967ce1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BidirectionalLSTMLayer(Layer): | |
''' | |
A long short-term memory (LSTM) layer. Includes "peephole connections" and | |
forget gate. Based on the definition in [#graves2014generating]_, which is | |
the current common definition. Gate names are taken from [#zaremba2014], | |
figure 1. | |
:references: | |
.. [#graves2014generating] Alex Graves, "Generating Sequences With | |
Recurrent Neural Networks". | |
.. [#zareba2014] Zaremba, W. et.al Recurrent neural network | |
regularization. (http://arxiv.org/abs/1409.2329) | |
''' | |
def __init__(self, input_layer, num_units, | |
W_in_to_gates=init.Normal(0.1), | |
W_hid_to_gates=init.Normal(0.1), | |
W_cell_to_gates=init.Normal(0.1), | |
b_gates=init.Normal(0.1), | |
nonlinearity_ingate=nonlinearities.sigmoid, | |
nonlinearity_forgetgate=nonlinearities.sigmoid, | |
nonlinearity_modulationgate=nonlinearities.tanh, | |
nonlinearity_outgate=nonlinearities.sigmoid, | |
nonlinearity_out=nonlinearities.tanh, | |
cell_init=init.Constant(0.), | |
hid_init=init.Constant(0.), | |
learn_init=False, | |
peepholes=True): | |
''' | |
Initialize an LSTM layer. For details on what the parameters mean, see | |
(7-11) from [#graves2014generating]_. | |
:parameters: | |
- input_layer : layers.Layer | |
Input to this recurrent layer | |
- num_units : int | |
Number of hidden units | |
- W_in_to_ingate : function or np.ndarray or theano.shared | |
:math:`W_{xi}` | |
- W_hid_to_ingate : function or np.ndarray or theano.shared | |
:math:`W_{hi}` | |
- W_cell_to_ingate : function or np.ndarray or theano.shared | |
:math:`W_{ci}` | |
- b_ingate : function or np.ndarray or theano.shared | |
:math:`b_i` | |
- nonlinearity_ingate : function | |
:math:`\sigma` | |
- W_in_to_forgetgate : function or np.ndarray or theano.shared | |
:math:`W_{xf}` | |
- W_hid_to_forgetgate : function or np.ndarray or theano.shared | |
:math:`W_{hf}` | |
- W_cell_to_forgetgate : function or np.ndarray or theano.shared | |
:math:`W_{cf}` | |
- b_forgetgate : function or np.ndarray or theano.shared | |
:math:`b_f` | |
- nonlinearity_forgetgate : function | |
:math:`\sigma` | |
- W_in_to_modulationgate : function or np.ndarray or theano.shared | |
:math:`W_{ic}` | |
- W_hid_to_modulationgate : function or np.ndarray or theano.shared | |
:math:`W_{hc}` | |
- b_modulationgate : function or np.ndarray or theano.shared | |
:math:`b_c` | |
- nonlinearity_modulationgate : function or np.ndarray or | |
theano.shared | |
:math:`\tanh` | |
- W_in_to_outgate : function or np.ndarray or theano.shared | |
:math:`W_{io}` | |
- W_hid_to_outgate : function or np.ndarray or theano.shared | |
:math:`W_{ho}` | |
- W_cell_to_outgate : function or np.ndarray or theano.shared | |
:math:`W_{co}` | |
- b_outgate : function or np.ndarray or theano.shared | |
:math:`b_o` | |
- nonlinearity_outgate : function | |
:math:`\sigma` | |
- nonlinearity_out : function or np.ndarray or theano.shared | |
:math:`\tanh` | |
- cell_init : function or np.ndarray or theano.shared | |
:math:`c_0` | |
- hid_init : function or np.ndarray or theano.shared | |
:math:`h_0` | |
- backwards : boolean | |
If True, process the sequence backwards | |
- learn_init : boolean | |
If True, initial hidden values are learned | |
- peepholes : boolean | |
If True, the LSTM uses peephole connections. | |
When False, W_cell_to_ingate, W_cell_to_forgetgate and | |
W_cell_to_outgate are ignored. | |
''' | |
# Initialize parent layer | |
super(BidirectionalLSTMLayer, self).__init__(input_layer) | |
# For any of the nonlinearities, if None is supplied, use identity | |
if nonlinearity_ingate is None: | |
self.nonlinearity_ingate = nonlinearities.identity | |
else: | |
self.nonlinearity_ingate = nonlinearity_ingate | |
if nonlinearity_forgetgate is None: | |
self.nonlinearity_forgetgate = nonlinearities.identity | |
else: | |
self.nonlinearity_forgetgate = nonlinearity_forgetgate | |
if nonlinearity_modulationgate is None: | |
self.nonlinearity_modulationgate = nonlinearities.identity | |
else: | |
self.nonlinearity_modulationgate = nonlinearity_modulationgate | |
if nonlinearity_outgate is None: | |
self.nonlinearity_outgate = nonlinearities.identity | |
else: | |
self.nonlinearity_outgate = nonlinearity_outgate | |
if nonlinearity_out is None: | |
self.nonlinearity_out = nonlinearities.identity | |
else: | |
self.nonlinearity_out = nonlinearity_out | |
self.learn_init = learn_init | |
self.num_units = num_units | |
self.peepholes = peepholes | |
# Input dimensionality is the output dimensionality of the input layer | |
(num_batch, _, num_inputs) = self.input_layer.get_output_shape() | |
# FORWARD WEIGHTS | |
if self.peepholes: | |
self.W_cell_to_gates_fwd = self.create_param(W_cell_to_gates, (3*num_units)) | |
self.b_gates_fwd = self.create_param(b_gates, (4*num_units)) | |
self.W_hid_to_gates_fwd = self.create_param(W_hid_to_gates, (num_units, 4*num_units)) | |
self.W_in_to_gates_fwd = self.create_param(W_in_to_gates, (num_inputs,4*num_units)) | |
# stack input to gate weights into a (num_inputs, 4*num_units) tensor | |
# Setup initial values for the cell and the lstm hidden units | |
self.cell_init_fwd = self.create_param(cell_init, (num_batch, num_units)) | |
self.hid_init_fwd = self.create_param(hid_init, (num_batch, num_units)) | |
## BACKWARD WEIGHTS | |
if self.peepholes: | |
self.W_cell_to_gates_bck = self.create_param(W_cell_to_gates, (3*num_units)) | |
self.b_gates_bck = self.create_param(b_gates, (4*num_units)) | |
self.W_hid_to_gates_bck = self.create_param(W_hid_to_gates, (num_units, 4*num_units)) | |
self.W_in_to_gates_bck = self.create_param(W_in_to_gates, (num_inputs,4*num_units)) | |
# stack input to gate weights into a (num_inputs, 4*num_units) tensor | |
# Setup initial values for the cell and the lstm hidden units | |
self.cell_init_bck = self.create_param(cell_init, (num_batch, num_units)) | |
self.hid_init_bck = self.create_param(hid_init, (num_batch, num_units)) | |
self.W_cell_to_gates_fwd.name = "W_cell_to_gates_fwd" | |
self.b_gates_fwd.name = "b_gates_fwd" | |
self.W_hid_to_gates_fwd.name = "W_hid_to_gates_fwd" | |
self.W_in_to_gates_fwd.name = "W_in_to_gates_fwd" | |
self.cell_init_fwd.name = "cell_init_fwd" | |
self.hid_init_fwd.name = "hid_init_fwd" | |
self.W_cell_to_gates_bck.name = "W_cell_to_gates_bck" | |
self.b_gates_bck.name = "b_gates_bck" | |
self.W_hid_to_gates_bck.name = "W_hid_to_gates_bck" | |
self.W_in_to_gates_bck.name = "W_in_to_gates_bck" | |
self.cell_init_bck.name = "cell_init_bck" | |
self.hid_init_bck.name = "hid_init_bck" | |
def get_params(self): | |
''' | |
Get all parameters of this layer. | |
:returns: | |
- params : list of theano.shared | |
List of all parameters | |
''' | |
params = self.get_weight_params() + self.get_bias_params() | |
if self.peepholes: | |
params.extend(self.get_peephole_params()) | |
if self.learn_init: | |
params.extend(self.get_init_params()) | |
return params | |
def get_weight_params(self): | |
''' | |
Get all weights of this layer | |
:returns: | |
- weight_params : list of theano.shared | |
List of all weight parameters | |
''' | |
return [self.W_in_to_gates_fwd, self.W_hid_to_gates_fwd, | |
self.W_in_to_gates_bck, self.W_hid_to_gates_bck] | |
def get_peephole_params(self): | |
''' | |
Get all peephole parameters of this layer. | |
:returns: | |
- init_params : list of theano.shared | |
List of all peephole parameters | |
''' | |
return [self.W_cell_to_gates_fwd, self.W_cell_to_gates_bck] | |
def get_init_params(self): | |
''' | |
Get all initital parameters of this layer. | |
:returns: | |
- init_params : list of theano.shared | |
List of all initial parameters | |
''' | |
return [self.hid_init_fwd, self.cell_init_fwd, | |
self.hid_init_bck, self.cell_init_bck] | |
def get_bias_params(self): | |
''' | |
Get all bias parameters of this layer. | |
:returns: | |
- bias_params : list of theano.shared | |
List of all bias parameters | |
''' | |
return [self.b_gates_fwd, self.b_gates_bck] | |
def get_output_shape_for(self, input_shape): | |
''' | |
Compute the expected output shape given the input. | |
:parameters: | |
- input_shape : tuple | |
Dimensionality of expected input | |
:returns: | |
- output_shape : tuple | |
Dimensionality of expected outputs given input_shape | |
''' | |
return (input_shape[0], input_shape[1], 2*self.num_units) | |
def get_output_for(self, input_fwd, mask=None, *args, **kwargs): | |
''' | |
Compute this layer's output function given a symbolic input variable | |
:parameters: | |
- input : theano.TensorType | |
Symbolic input variable | |
- mask : theano.TensorType | |
Theano variable denoting whether each time step in each | |
sequence in the batch is part of the sequence or not. This is | |
needed when scanning backwards. If all sequences are of the | |
same length, it should be all 1s. | |
:returns: | |
- layer_output : theano.TensorType | |
Symbolic output variable | |
''' | |
# Treat all layers after the first as flattened feature dimensions | |
assert mask is not None | |
if input_fwd.ndim > 3: | |
input = input_Fwd.reshape((input_fwd.shape[0], input_fwd.shape[1], | |
T.prod(input_fwd.shape[2:]))) | |
# precompute inputs*W and dimshuffle | |
# Input is provided as (n_batch, n_time_steps, n_features) | |
# W _in_to_gates is (n_features, 4*num_units). input dot W is then | |
# (n_batch, n_time_steps, 4*num_units). Because scan iterate over the | |
# first dimension we dimshuffle to (n_time_steps, n_batch, n_features) | |
# flip input and mask if we ar going backwards | |
input_bck = input_fwd[:, ::-1, :] | |
mask_bck = mask[:, ::-1] | |
input_dot_W_fwd = T.dot(input_fwd, self.W_in_to_gates_fwd).dimshuffle(1, 0, 2) | |
input_dot_W_bck = T.dot(input_bck, self.W_in_to_gates_bck).dimshuffle(1, 0, 2) | |
input_dot_W_fwd += self.b_gates_fwd | |
input_dot_W_bck += self.b_gates_bck | |
# mask is given as (batch_size, seq_len). Because scan iterates over | |
# first dim. we dimshuffle to (seq_len, batch_size) and add a | |
# broadcastable dimension | |
#mask_fwd = mask_fwd.dimshuffle(1, 0, 'x') | |
mask_bck = mask_bck.dimshuffle(1, 0, 'x') | |
# input_dow_w is (n_batch, n_time_steps, 4*num_units). We define a | |
# slicing function that extract the input to each LSTM gate | |
# slice_c is similar but for peephole weights. | |
def slice_w(x, n): | |
return x[:, n*self.num_units:(n+1)*self.num_units] | |
def slice_c(x, n): | |
return x[n*self.num_units:(n+1)*self.num_units] | |
# Create single recurrent computation step function | |
# input_dot_W_n is the n'th row of the input dot W multiplication | |
# The step function calculates the following: | |
# | |
# i_t = \sigma(W_{xi}x_t + W_{hi}h_{t-1} + W_{ci}c_{t-1} + b_i) | |
# f_t = \sigma(W_{xf}x_t + W_{hf}h_{t-1} + W_{cf}c_{t-1} + b_f) | |
# c_t = f_tc_{t - 1} + i_t\tanh(W_{xc}x_t + W_{hc}h_{t-1} + b_c) | |
# o_t = \sigma(W_{xo}x_t + W_{ho}h_{t-1} + W_{co}c_t + b_o) | |
# h_t = o_t \tanh(c_t) | |
# | |
# Gate names are taken from http://arxiv.org/abs/1409.2329 figure 1 | |
def dostep(input_dot_W_n, cell_previous, hid_previous, | |
W_hid_to_gates, W_cell_to_gates): | |
# calculate gates pre-activations and slice | |
gates = input_dot_W_n + T.dot(hid_previous, W_hid_to_gates) | |
ingate = slice_w(gates,0) | |
forgetgate = slice_w(gates,1) | |
modulationgate = slice_w(gates,2) | |
outgate = slice_w(gates,3) | |
if self.peepholes: | |
ingate += cell_previous*slice_c(W_cell_to_gates, 0) | |
forgetgate = cell_previous*slice_c(W_cell_to_gates, 1) | |
outgate = cell_previous*slice_c(W_cell_to_gates, 2) | |
ingate = self.nonlinearity_ingate(ingate) | |
forgetgate = self.nonlinearity_forgetgate(forgetgate) | |
modulationgate = self.nonlinearity_modulationgate(modulationgate) | |
outgate = self.nonlinearity_outgate(outgate) | |
cell = forgetgate*cell_previous + ingate*modulationgate | |
hid = outgate*self.nonlinearity_out(cell) | |
return cell, hid | |
def step(input_dot_W_fwd_n, input_dot_W_bck_n, mask_bck, | |
cell_previous_fwd, hid_previous_fwd, | |
cell_previous_bck, hid_previous_bck): | |
#forward | |
cell_fwd, hid_fwd = dostep( | |
input_dot_W_fwd_n, cell_previous_fwd, hid_previous_fwd, | |
self.W_hid_to_gates_fwd, self.W_cell_to_gates_fwd) | |
# backward | |
cell_bck, hid_bck = dostep( | |
input_dot_W_bck_n, cell_previous_bck, hid_previous_bck, | |
self.W_hid_to_gates_bck, self.W_cell_to_gates_bck) | |
# If mask is 0, use previous state until mask = 1 is found. | |
# This propagates the layer initial state when moving backwards | |
# until the end of the sequence is found. | |
not_mask_bck = 1 - mask_bck | |
cell_bck = cell_bck*mask_bck + cell_previous_bck*not_mask_bck | |
hid_bck = hid_fwd*mask_bck + hid_previous_bck*not_mask_bck | |
return [cell_fwd, hid_fwd, cell_bck, hid_bck] | |
sequences = [input_dot_W_fwd, input_dot_W_bck, mask_bck] | |
init = [self.cell_init_fwd, self.hid_init_fwd, | |
self.cell_init_bck, self.hid_init_bck] | |
# Scan op iterates over first dimension of input and repeatedly | |
# applied the step function | |
scan_out = theano.scan(step, sequences=sequences, outputs_info=init)#[0][1] | |
# outputis (n_time_steps, n_batch, n_units)) | |
output_fwd = scan_out[0][1] | |
output_bck = scan_out[0][2] # this should be 3 but it does not compile | |
# reverse bck output | |
output_bck = output_bck[::-1, :, :] | |
# concateante fwd and bck | |
output = T.concatenate([output_fwd, output_bck], axis=2) | |
# Now, dimshuffle back to (n_batch, n_time_steps, n_units)) | |
output = output.dimshuffle(1, 0, 2) | |
return output |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment