Created
January 11, 2016 17:51
-
-
Save skaae/0ca0c33d0fc908f247d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import theano | |
import theano.tensor as T | |
from lasagne import nonlinearities | |
from lasagne import init | |
from lasagne.utils import unroll_scan | |
from lasagne.layers import MergeLayer, Layer, InputLayer, DenseLayer | |
from lasagne.layers import helper | |
from lasagne.layers import Gate | |
class GRUCell(MergeLayer): | |
r""" | |
Gated Recurrent Unit (GRU) Layer | |
Implements the recurrent step proposed in [1]_, which computes the output | |
by | |
.. math :: | |
r_t &= \sigma_r(x_t W_{xr} + h_{t - 1} W_{hr} + b_r)\\ | |
u_t &= \sigma_u(x_t W_{xu} + h_{t - 1} W_{hu} + b_u)\\ | |
c_t &= \sigma_c(x_t W_{xc} + r_t \odot (h_{t - 1} W_{hc}) + b_c)\\ | |
h_t &= (1 - u_t) \odot h_{t - 1} + u_t \odot c_t | |
Parameters | |
---------- | |
incoming : a :class:`lasagne.layers.Layer` instance or a tuple | |
The layer feeding into this layer, or the expected input shape. | |
num_units_gru : int | |
Number of hidden units in the layer. | |
resetgate : Gate | |
Parameters for the reset gate (:math:`r_t`): :math:`W_{xr}`, | |
:math:`W_{hr}`, :math:`b_r`, and :math:`\sigma_r`. | |
updategate : Gate | |
Parameters for the update gate (:math:`u_t`): :math:`W_{xu}`, | |
:math:`W_{hu}`, :math:`b_u`, and :math:`\sigma_u`. | |
hidden_update : Gate | |
Parameters for the hidden update (:math:`c_t`): :math:`W_{xc}`, | |
:math:`W_{hc}`, :math:`b_c`, and :math:`\sigma_c`. | |
hid_init : callable, np.ndarray, theano.shared or :class:`Layer` | |
Initializer for initial hidden state (:math:`h_0`). | |
learn_init : bool | |
If True, initial hidden values are learned. | |
grad_clipping : float | |
If nonzero, the gradient messages are clipped to the given value during | |
the backward pass. See [1]_ (p. 6) for further explanation. | |
References | |
---------- | |
.. [1] Cho, Kyunghyun, et al: On the properties of neural | |
machine translation: Encoder-decoder approaches. | |
arXiv preprint arXiv:1409.1259 (2014). | |
.. [2] Chung, Junyoung, et al.: Empirical Evaluation of Gated | |
Recurrent Neural Networks on Sequence Modeling. | |
arXiv preprint arXiv:1412.3555 (2014). | |
.. [3] Graves, Alex: "Generating sequences with recurrent neural networks." | |
arXiv preprint arXiv:1308.0850 (2013). | |
Notes | |
----- | |
An alternate update for the candidate hidden state is proposed in [2]_: | |
.. math:: | |
c_t &= \sigma_c(x_t W_{ic} + (r_t \odot h_{t - 1})W_{hc} + b_c)\\ | |
We use the formulation from [1]_ because it allows us to do all matrix | |
operations in a single dot product. | |
""" | |
def __init__(self, x, hid_previous, num_units, | |
resetgate=Gate(W_cell=None), | |
updategate=Gate(W_cell=None), | |
hidden_update=Gate(W_cell=None, | |
nonlinearity=nonlinearities.tanh), | |
hid_init=init.Constant(0.), | |
learn_init=False, | |
grad_clipping=0, | |
**kwargs): | |
if hid_previous.output_shape[-1] != num_units: | |
raise ValueError('Number of hid_previous inputs should be the ' | |
'same as num_units_gru') | |
if x.output_shape[0] != hid_previous.output_shape[0]: | |
raise ValueError('first dimension output of x and hid_previous ' | |
'should be equal') | |
# Initialize parent layer | |
super(GRUCell, self).__init__([x, hid_previous], **kwargs) | |
self.learn_init = learn_init | |
self.num_units = num_units # this could also be inferred? | |
self.grad_clipping = grad_clipping | |
self.unroll_scan = unroll_scan | |
# Retrieve the dimensionality of the incoming layer | |
input_shape_x = self.input_shapes[0] | |
input_shape_h = self.input_shapes[1] | |
# Input dimensionality is the output dimensionality of the input layer | |
num_inputs_x = np.prod(input_shape_x[1:]) | |
def add_gate_params(gate, gate_name): | |
""" Convenience function for adding layer parameters from a Gate | |
instance. """ | |
return (self.add_param(gate.W_in, (num_inputs_x, num_units), | |
name="W_in_to_{}".format(gate_name)), | |
self.add_param(gate.W_hid, (num_units, num_units), | |
name="W_hid_to_{}".format(gate_name)), | |
self.add_param(gate.b, (num_units,), | |
name="b_{}".format(gate_name), | |
regularizable=False), | |
gate.nonlinearity) | |
# Add in all parameters from gates | |
(self.W_in_to_updategate, self.W_hid_to_updategate, self.b_updategate, | |
self.nonlinearity_updategate) = add_gate_params(updategate, | |
'updategate') | |
(self.W_in_to_resetgate, self.W_hid_to_resetgate, self.b_resetgate, | |
self.nonlinearity_resetgate) = add_gate_params(resetgate, 'resetgate') | |
(self.W_in_to_hidden_update, self.W_hid_to_hidden_update, | |
self.b_hidden_update, self.nonlinearity_hid) = add_gate_params( | |
hidden_update, 'hidden_update') | |
# Initialize hidden state | |
self.hid_init = self.add_param( | |
hid_init, (1, self.num_units), name="hid_init", | |
trainable=learn_init, regularizable=False) | |
# Stack input weight matrices into a (num_inputs, 3*num_units_gru) | |
# matrix, which speeds up computation | |
self.W_in_stacked = T.concatenate( | |
[self.W_in_to_resetgate, self.W_in_to_updategate, | |
self.W_in_to_hidden_update], axis=1) | |
# Same for hidden weight matrices | |
self.W_hid_stacked = T.concatenate( | |
[self.W_hid_to_resetgate, self.W_hid_to_updategate, | |
self.W_hid_to_hidden_update], axis=1) | |
# Stack gate biases into a (3*num_units_gru) vector | |
self.b_stacked = T.concatenate( | |
[self.b_resetgate, self.b_updategate, | |
self.b_hidden_update], axis=0) | |
def get_hid_init(self, num_batch): | |
return T.dot(T.ones((num_batch, 1)), self.hid_init) | |
def get_output_shape_for(self, input_shapes): | |
# The shape of the input to this layer will be the first element | |
# of input_shapes, whether or not a mask input is being used. | |
input_shape = input_shapes[0] | |
return input_shape[0], self.num_units | |
def get_output_for(self, inputs, **kwargs): | |
""" | |
TODO: | |
inputs: [x_t, h_previous] | |
""" | |
# Retrieve the layer input | |
input_n, hid_previous = inputs | |
# Treat all dimensions after the second as flattened feature dimensions | |
if input_n.ndim > 2: | |
input_n = T.flatten(input_n, 2) | |
# At each call to scan, input_n will be (n_time_steps, 3*num_units_gru). | |
# We define a slicing function that extract the input to each GRU gate | |
def slice_w(x, n): | |
return x[:, n*self.num_units:(n+1)*self.num_units] | |
# Create single recurrent computation step function | |
# input__n is the n'th vector of the input | |
def step(input_n, hid_previous, *args): | |
# Compute W_{hr} h_{t - 1}, W_{hu} h_{t - 1}, and W_{hc} h_{t - 1} | |
hid_input = T.dot(hid_previous, self.W_hid_stacked) | |
if self.grad_clipping: | |
input_n = theano.gradient.grad_clip( | |
input_n, -self.grad_clipping, self.grad_clipping) | |
hid_input = theano.gradient.grad_clip( | |
hid_input, -self.grad_clipping, self.grad_clipping) | |
input_n = T.dot(input_n, self.W_in_stacked) + self.b_stacked | |
# Reset and update gates | |
resetgate = slice_w(hid_input, 0) + slice_w(input_n, 0) | |
updategate = slice_w(hid_input, 1) + slice_w(input_n, 1) | |
resetgate = self.nonlinearity_resetgate(resetgate) | |
updategate = self.nonlinearity_updategate(updategate) | |
# Compute W_{xc}x_t + r_t \odot (W_{hc} h_{t - 1}) | |
hidden_update_in = slice_w(input_n, 2) | |
hidden_update_hid = slice_w(hid_input, 2) | |
hidden_update = hidden_update_in + resetgate*hidden_update_hid | |
if self.grad_clipping: | |
hidden_update = theano.gradient.grad_clip( | |
hidden_update, -self.grad_clipping, self.grad_clipping) | |
hidden_update = self.nonlinearity_hid(hidden_update) | |
# Compute (1 - u_t)h_{t - 1} + u_t c_t | |
hid = (1 - updategate)*hid_previous + updategate*hidden_update | |
return hid | |
return step(input_n, hid_previous) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment