Created
January 16, 2015 10:28
-
-
Save skaae/0e2a7cefa593242f5a7c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LSTMLayer(Layer): | |
''' | |
A long short-term memory (LSTM) layer. Includes "peephole connections" and | |
forget gate. Based on the definition in [#graves2014generating]_, which is | |
the current common definition. Gate names are taken from [#zaremba2014], | |
figure 1. | |
:references: | |
.. [#graves2014generating] Alex Graves, "Generating Sequences With | |
Recurrent Neural Networks". | |
.. [#zareba2014] Zaremba, W. et.al Recurrent neural network | |
regularization. (http://arxiv.org/abs/1409.2329) | |
''' | |
def __init__(self, input_layer, num_units, | |
W_in_to_ingate=init.Normal(0.1), | |
W_hid_to_ingate=init.Normal(0.1), | |
W_cell_to_ingate=init.Normal(0.1), | |
b_ingate=init.Normal(0.1), | |
nonlinearity_ingate=nonlinearities.sigmoid, | |
W_in_to_forgetgate=init.Normal(0.1), | |
W_hid_to_forgetgate=init.Normal(0.1), | |
W_cell_to_forgetgate=init.Normal(0.1), | |
b_forgetgate=init.Normal(0.1), | |
nonlinearity_forgetgate=nonlinearities.sigmoid, | |
W_in_to_modulationgate=init.Normal(0.1), | |
W_hid_to_modulationgate=init.Normal(0.1), | |
b_modulationgate=init.Normal(0.1), | |
nonlinearity_modulationgate=nonlinearities.tanh, | |
W_in_to_outgate=init.Normal(0.1), | |
W_hid_to_outgate=init.Normal(0.1), | |
W_cell_to_outgate=init.Normal(0.1), | |
b_outgate=init.Normal(0.1), | |
nonlinearity_outgate=nonlinearities.sigmoid, | |
nonlinearity_out=nonlinearities.tanh, | |
cell_init=init.Constant(0.), | |
hid_init=init.Constant(0.), | |
backwards=False, | |
learn_init=False, | |
peepholes=True): | |
''' | |
Initialize an LSTM layer. For details on what the parameters mean, see | |
(7-11) from [#graves2014generating]_. | |
:parameters: | |
- input_layer : layers.Layer | |
Input to this recurrent layer | |
- num_units : int | |
Number of hidden units | |
- W_in_to_ingate : function or np.ndarray or theano.shared | |
:math:`W_{xi}` | |
- W_hid_to_ingate : function or np.ndarray or theano.shared | |
:math:`W_{hi}` | |
- W_cell_to_ingate : function or np.ndarray or theano.shared | |
:math:`W_{ci}` | |
- b_ingate : function or np.ndarray or theano.shared | |
:math:`b_i` | |
- nonlinearity_ingate : function | |
:math:`\sigma` | |
- W_in_to_forgetgate : function or np.ndarray or theano.shared | |
:math:`W_{xf}` | |
- W_hid_to_forgetgate : function or np.ndarray or theano.shared | |
:math:`W_{hf}` | |
- W_cell_to_forgetgate : function or np.ndarray or theano.shared | |
:math:`W_{cf}` | |
- b_forgetgate : function or np.ndarray or theano.shared | |
:math:`b_f` | |
- nonlinearity_forgetgate : function | |
:math:`\sigma` | |
- W_in_to_modulationgate : function or np.ndarray or theano.shared | |
:math:`W_{ic}` | |
- W_hid_to_modulationgate : function or np.ndarray or theano.shared | |
:math:`W_{hc}` | |
- b_modulationgate : function or np.ndarray or theano.shared | |
:math:`b_c` | |
- nonlinearity_modulationgate : function or np.ndarray or | |
theano.shared | |
:math:`\tanh` | |
- W_in_to_outgate : function or np.ndarray or theano.shared | |
:math:`W_{io}` | |
- W_hid_to_outgate : function or np.ndarray or theano.shared | |
:math:`W_{ho}` | |
- W_cell_to_outgate : function or np.ndarray or theano.shared | |
:math:`W_{co}` | |
- b_outgate : function or np.ndarray or theano.shared | |
:math:`b_o` | |
- nonlinearity_outgate : function | |
:math:`\sigma` | |
- nonlinearity_out : function or np.ndarray or theano.shared | |
:math:`\tanh` | |
- cell_init : function or np.ndarray or theano.shared | |
:math:`c_0` | |
- hid_init : function or np.ndarray or theano.shared | |
:math:`h_0` | |
- backwards : boolean | |
If True, process the sequence backwards | |
- learn_init : boolean | |
If True, initial hidden values are learned | |
- peepholes : boolean | |
If True, the LSTM uses peephole connections. | |
When False, W_cell_to_ingate, W_cell_to_forgetgate and | |
W_cell_to_outgate are ignored. | |
''' | |
# Initialize parent layer | |
super(LSTMLayer, self).__init__(input_layer) | |
# For any of the nonlinearities, if None is supplied, use identity | |
if nonlinearity_ingate is None: | |
self.nonlinearity_ingate = nonlinearities.identity | |
else: | |
self.nonlinearity_ingate = nonlinearity_ingate | |
if nonlinearity_forgetgate is None: | |
self.nonlinearity_forgetgate = nonlinearities.identity | |
else: | |
self.nonlinearity_forgetgate = nonlinearity_forgetgate | |
if nonlinearity_modulationgate is None: | |
self.nonlinearity_modulationgate = nonlinearities.identity | |
else: | |
self.nonlinearity_modulationgate = nonlinearity_modulationgate | |
if nonlinearity_outgate is None: | |
self.nonlinearity_outgate = nonlinearities.identity | |
else: | |
self.nonlinearity_outgate = nonlinearity_outgate | |
if nonlinearity_out is None: | |
self.nonlinearity_out = nonlinearities.identity | |
else: | |
self.nonlinearity_out = nonlinearity_out | |
self.learn_init = learn_init | |
self.num_units = num_units | |
self.backwards = backwards | |
self.peepholes = peepholes | |
# Input dimensionality is the output dimensionality of the input layer | |
(num_batch, _, num_inputs) = self.input_layer.get_output_shape() | |
# Initialize parameters using the supplied args | |
self.W_in_to_ingate = self.create_param( | |
W_in_to_ingate, (num_inputs, num_units)) | |
self.W_hid_to_ingate = self.create_param( | |
W_hid_to_ingate, (num_units, num_units)) | |
self.b_ingate = self.create_param(b_ingate, (num_units)) | |
self.W_in_to_forgetgate = self.create_param( | |
W_in_to_forgetgate, (num_inputs, num_units)) | |
self.W_hid_to_forgetgate = self.create_param( | |
W_hid_to_forgetgate, (num_units, num_units)) | |
self.b_forgetgate = self.create_param(b_forgetgate, (num_units,)) | |
self.W_in_to_modulationgate = self.create_param( | |
W_in_to_modulationgate, (num_inputs, num_units)) | |
self.W_hid_to_modulationgate = self.create_param( | |
W_hid_to_modulationgate, (num_units, num_units)) | |
self.b_modulationgate = self.create_param( | |
b_modulationgate, (num_units,)) | |
self.W_in_to_outgate = self.create_param( | |
W_in_to_outgate, (num_inputs, num_units)) | |
self.W_hid_to_outgate = self.create_param( | |
W_hid_to_outgate, (num_units, num_units)) | |
self.b_outgate = self.create_param(b_outgate, (num_units,)) | |
# stack input to gate weights into a (num_inputs, 4*num_units) tensor | |
self.W_in_to_gates = T.concatenate( | |
[self.W_in_to_ingate, self.W_in_to_forgetgate, | |
self.W_in_to_modulationgate, self.W_in_to_outgate], axis=1) | |
# stack hid to gate weights into a (num_units, 4*num_units) tensor | |
self.W_hid_to_gates = T.concatenate( | |
[self.W_hid_to_ingate, self.W_hid_to_forgetgate, | |
self.W_hid_to_modulationgate, self.W_hid_to_outgate], axis=1) | |
# stack gate biases into a (4*num_units) vector | |
self.b_gates = T.concatenate( | |
[self.b_ingate, self.b_forgetgate, | |
self.b_modulationgate, self.b_outgate], axis=0) | |
# init peepholes | |
if self.peepholes: | |
self.W_cell_to_ingate = self.create_param( | |
W_cell_to_ingate, (num_units)) | |
self.W_cell_to_forgetgate = self.create_param( | |
W_cell_to_forgetgate, (num_units)) | |
self.W_cell_to_outgate = self.create_param( | |
W_cell_to_outgate, (num_units)) | |
# concatenate peephole weights to (3*num_units) vector | |
self.W_cell_to_gates = T.concatenate( | |
[self.W_cell_to_ingate, self.W_cell_to_forgetgate, | |
self.W_cell_to_outgate], axis=0) | |
# Setup initial values for the cell and the lstm hidden units | |
self.cell_init = self.create_param(cell_init, (num_batch, num_units)) | |
self.hid_init = self.create_param(hid_init, (num_batch, num_units)) | |
def get_params(self): | |
''' | |
Get all parameters of this layer. | |
:returns: | |
- params : list of theano.shared | |
List of all parameters | |
''' | |
params = self.get_weight_params() + self.get_bias_params() | |
if self.peepholes: | |
params.extend(self.get_peephole_params()) | |
if self.learn_init: | |
params.extend(self.get_init_params()) | |
return params | |
def get_weight_params(self): | |
''' | |
Get all weights of this layer | |
:returns: | |
- weight_params : list of theano.shared | |
List of all weight parameters | |
''' | |
return [self.W_in_to_ingate, | |
self.W_hid_to_ingate, | |
self.W_in_to_forgetgate, | |
self.W_hid_to_forgetgate, | |
self.W_in_to_modulationgate, | |
self.W_hid_to_modulationgate, | |
self.W_in_to_outgate, | |
self.W_hid_to_outgate] | |
def get_peephole_params(self): | |
''' | |
Get all peephole parameters of this layer. | |
:returns: | |
- init_params : list of theano.shared | |
List of all peephole parameters | |
''' | |
return [self.W_cell_to_ingate, | |
self.W_cell_to_forgetgate, | |
self.W_cell_to_outgate] | |
def get_init_params(self): | |
''' | |
Get all initital parameters of this layer. | |
:returns: | |
- init_params : list of theano.shared | |
List of all initial parameters | |
''' | |
return [self.hid_init, self.cell_init] | |
def get_bias_params(self): | |
''' | |
Get all bias parameters of this layer. | |
:returns: | |
- bias_params : list of theano.shared | |
List of all bias parameters | |
''' | |
return [self.b_ingate, self.b_forgetgate, | |
self.b_modulationgate, self.b_outgate] | |
def get_output_shape_for(self, input_shape): | |
''' | |
Compute the expected output shape given the input. | |
:parameters: | |
- input_shape : tuple | |
Dimensionality of expected input | |
:returns: | |
- output_shape : tuple | |
Dimensionality of expected outputs given input_shape | |
''' | |
return (input_shape[0], input_shape[1], self.num_units) | |
def get_output_for(self, input, mask=None, *args, **kwargs): | |
''' | |
Compute this layer's output function given a symbolic input variable | |
:parameters: | |
- input : theano.TensorType | |
Symbolic input variable | |
- mask : theano.TensorType | |
Theano variable denoting whether each time step in each | |
sequence in the batch is part of the sequence or not. This is | |
needed when scanning backwards. If all sequences are of the | |
same length, it should be all 1s. | |
:returns: | |
- layer_output : theano.TensorType | |
Symbolic output variable | |
''' | |
if self.backwards: | |
assert mask is not None, ("Mask must be given to get_output_for" | |
" when backwards is true") | |
# Treat all layers after the first as flattened feature dimensions | |
if input.ndim > 3: | |
input = input.reshape((input.shape[0], input.shape[1], | |
T.prod(input.shape[2:]))) | |
# precompute inputs*W and dimshuffle | |
# Input is provided as (n_batch, n_time_steps, n_features) | |
# W _in_to_gates is (n_features, 4*num_units). input dot W is then | |
# (n_batch, n_time_steps, 4*num_units). Because scan iterate over the | |
# first dimension we dimshuffle to (n_time_steps, n_batch, n_features) | |
if self.backwards: | |
input = input[:, ::-1, :] | |
input_dot_W = T.dot(input, self.W_in_to_gates).dimshuffle(1, 0, 2) | |
input_dot_W += self.b_gates | |
# input_dow_w is (n_batch, n_time_steps, 4*num_units). We define a | |
# slicing function that extract the input to each LSTM gate | |
# slice_c is similar but for peephole weights. | |
def slice_w(x, n): | |
return x[:, n*self.num_units:(n+1)*self.num_units] | |
def slice_c(x, n): | |
return x[n*self.num_units:(n+1)*self.num_units] | |
# Create single recurrent computation step function | |
# input_dot_W_n is the n'th row of the input dot W multiplication | |
# The step function calculates the following: | |
# | |
# i_t = \sigma(W_{xi}x_t + W_{hi}h_{t-1} + W_{ci}c_{t-1} + b_i) | |
# f_t = \sigma(W_{xf}x_t + W_{hf}h_{t-1} + W_{cf}c_{t-1} + b_f) | |
# c_t = f_tc_{t - 1} + i_t\tanh(W_{xc}x_t + W_{hc}h_{t-1} + b_c) | |
# o_t = \sigma(W_{xo}x_t + W_{ho}h_{t-1} + W_{co}c_t + b_o) | |
# h_t = o_t \tanh(c_t) | |
# | |
# Gate names are taken from http://arxiv.org/abs/1409.2329 figure 1 | |
def step(input_dot_W_n, cell_previous, hid_previous): | |
# calculate gates pre-activations and slice | |
gates = input_dot_W_n + T.dot(hid_previous, self.W_hid_to_gates) | |
ingate = slice_w(gates,0) | |
forgetgate = slice_w(gates,1) | |
modulationgate = slice_w(gates,2) | |
outgate = slice_w(gates,3) | |
if self.peepholes: | |
ingate += cell_previous*slice_c(self.W_cell_to_gates, 0) | |
forgetgate = cell_previous*slice_c(self.W_cell_to_gates, 1) | |
outgate = cell_previous*slice_c(self.W_cell_to_gates, 2) | |
ingate = self.nonlinearity_ingate(ingate) | |
forgetgate = self.nonlinearity_forgetgate(forgetgate) | |
modulationgate = self.nonlinearity_modulationgate(modulationgate) | |
outgate = self.nonlinearity_outgate(outgate) | |
cell = forgetgate*cell_previous + ingate*modulationgate | |
hid = outgate*self.nonlinearity_out(cell) | |
return [cell, hid] | |
def step_back(input_dot_W_n, mask, cell_previous, hid_previous): | |
cell, hid = step(input_dot_W_n, cell_previous, hid_previous) | |
# If mask is 0, use previous state until mask = 1 is found. | |
# This propagates the layer initial state when moving backwards | |
# until the end of the sequence is found. | |
not_mask = 1 - mask | |
cell = cell*mask + cell_previous*not_mask | |
hid = hid*mask + hid_previous*not_mask | |
return [cell, hid] | |
# if scan is backward reverse the output | |
if self.backwards: | |
# mask is given as (batch_size, seq_len). Because scan iterates over | |
# first dim. we dimshuffle to (seq_len, batch_size) and add a | |
# broadcastable dimension | |
mask = mask[:, ::-1] | |
mask = mask.dimshuffle(1, 0, 'x') | |
sequences = [input_dot_W, mask] | |
step_fun = step_back | |
else: | |
sequences = input_dot_W | |
step_fun = step | |
# Scan op iterates over first dimension of input and repeatedly | |
# applied the step function | |
output = theano.scan(step_fun, sequences=sequences, | |
outputs_info=[self.cell_init, self.hid_init])[0][1] | |
# Now, dimshuffle back to (n_batch, n_time_steps, n_features)) | |
output = output.dimshuffle(1, 0, 2) | |
return output |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment