Last active
June 28, 2016 11:28
-
-
Save tencia/e2e8686228b670bf9002 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lasagne as nn | |
import numpy as np | |
import theano | |
import theano.tensor as T | |
import math | |
from lasagne.utils import unroll_scan | |
from lasagne.layers import LSTMLayer | |
from lasagne.layers import Gate | |
from lasagne.layers import Layer | |
# 01/04/2016 | |
# demo of a way to sample on-line from an LSTM in Lasagne by treating hidden/cell states | |
# as theano variables (instead of reconstructing the sequence each time) | |
# extension of lasagne.layers.LSTMLayer which re-implements get_output to return both cell | |
# and hidden state instead of just hidden state | |
class LSTMSampleableLayer(LSTMLayer): | |
def __init__(self, *args, **kwargs): | |
super(LSTMSampleableLayer, self).__init__(*args, **kwargs) | |
def get_output_shape_for(self, input_shapes): | |
ret = super(LSTMSampleableLayer, self).get_output_shape_for(input_shapes) | |
return ret[:-1] + (ret[-1:][0]*2,) if len(ret) > 1 else ret | |
def get_output_for(self, inputs, **kwargs): | |
""" | |
Have to re-write LSTMLayer's output construction because we need | |
cell_out, which is not stored in the original | |
This is exactly the same except that we return cell_out and hid_out | |
concatenated together, instead of just hid_out | |
""" | |
# Retrieve the layer input | |
input = inputs[0] | |
# Retrieve the mask when it is supplied | |
mask = None | |
hid_init = None | |
cell_init = None | |
if self.mask_incoming_index > 0: | |
mask = inputs[self.mask_incoming_index] | |
if self.hid_init_incoming_index > 0: | |
hid_init = inputs[self.hid_init_incoming_index] | |
if self.cell_init_incoming_index > 0: | |
cell_init = inputs[self.cell_init_incoming_index] | |
# Treat all dimensions after the second as flattened feature dimensions | |
if input.ndim > 3: | |
input = T.flatten(input, 3) | |
# Because scan iterates over the first dimension we dimshuffle to | |
# (n_time_steps, n_batch, n_features) | |
input = input.dimshuffle(1, 0, 2) | |
seq_len, num_batch, _ = input.shape | |
# Stack input weight matrices into a (num_inputs, 4*num_units) | |
# matrix, which speeds up computation | |
W_in_stacked = T.concatenate( | |
[self.W_in_to_ingate, self.W_in_to_forgetgate, | |
self.W_in_to_cell, self.W_in_to_outgate], axis=1) | |
# Same for hidden weight matrices | |
W_hid_stacked = T.concatenate( | |
[self.W_hid_to_ingate, self.W_hid_to_forgetgate, | |
self.W_hid_to_cell, self.W_hid_to_outgate], axis=1) | |
# Stack biases into a (4*num_units) vector | |
b_stacked = T.concatenate( | |
[self.b_ingate, self.b_forgetgate, | |
self.b_cell, self.b_outgate], axis=0) | |
if self.precompute_input: | |
# Because the input is given for all time steps, we can | |
# precompute_input the inputs dot weight matrices before scanning. | |
# W_in_stacked is (n_features, 4*num_units). input is then | |
# (n_time_steps, n_batch, 4*num_units). | |
input = T.dot(input, W_in_stacked) + b_stacked | |
# At each call to scan, input_n will be (n_time_steps, 4*num_units). | |
# We define a slicing function that extract the input to each LSTM gate | |
def slice_w(x, n): | |
return x[:, n*self.num_units:(n+1)*self.num_units] | |
# Create single recurrent computation step function | |
# input_n is the n'th vector of the input | |
def step(input_n, cell_previous, hid_previous, *args): | |
if not self.precompute_input: | |
input_n = T.dot(input_n, W_in_stacked) + b_stacked | |
# Calculate gates pre-activations and slice | |
gates = input_n + T.dot(hid_previous, W_hid_stacked) | |
# Clip gradients | |
if self.grad_clipping: | |
gates = theano.gradient.grad_clip( | |
gates, -self.grad_clipping, self.grad_clipping) | |
# Extract the pre-activation gate values | |
ingate = slice_w(gates, 0) | |
forgetgate = slice_w(gates, 1) | |
cell_input = slice_w(gates, 2) | |
outgate = slice_w(gates, 3) | |
if self.peepholes: | |
# Compute peephole connections | |
ingate += cell_previous*self.W_cell_to_ingate | |
forgetgate += cell_previous*self.W_cell_to_forgetgate | |
# Apply nonlinearities | |
ingate = self.nonlinearity_ingate(ingate) | |
forgetgate = self.nonlinearity_forgetgate(forgetgate) | |
cell_input = self.nonlinearity_cell(cell_input) | |
# Compute new cell value | |
cell = forgetgate*cell_previous + ingate*cell_input | |
if self.peepholes: | |
outgate += cell*self.W_cell_to_outgate | |
outgate = self.nonlinearity_outgate(outgate) | |
# Compute new hidden unit activation | |
hid = outgate*self.nonlinearity(cell) | |
return [cell, hid] | |
def step_masked(input_n, mask_n, cell_previous, hid_previous, *args): | |
cell, hid = step(input_n, cell_previous, hid_previous, *args) | |
# Skip over any input with mask 0 by copying the previous | |
# hidden state; proceed normally for any input with mask 1. | |
not_mask = 1 - mask_n | |
cell = cell*mask_n + cell_previous*not_mask | |
hid = hid*mask_n + hid_previous*not_mask | |
return [cell, hid] | |
if mask is not None: | |
# mask is given as (batch_size, seq_len). Because scan iterates | |
# over first dimension, we dimshuffle to (seq_len, batch_size) and | |
# add a broadcastable dimension | |
mask = mask.dimshuffle(1, 0, 'x') | |
sequences = [input, mask] | |
step_fun = step_masked | |
else: | |
sequences = input | |
step_fun = step | |
ones = T.ones((num_batch, 1)) | |
if isinstance(self.cell_init, Layer): | |
pass | |
elif isinstance(self.cell_init, T.TensorVariable): | |
cell_init = self.cell_init | |
else: | |
# Dot against a 1s vector to repeat to shape (num_batch, num_units) | |
cell_init = T.dot(ones, self.cell_init) | |
if isinstance(self.hid_init, Layer): | |
pass | |
elif isinstance(self.hid_init, T.TensorVariable): | |
hid_init = self.hid_init | |
else: | |
# Dot against a 1s vector to repeat to shape (num_batch, num_units) | |
hid_init = T.dot(ones, self.hid_init) | |
# The hidden-to-hidden weight matrix is always used in step | |
non_seqs = [W_hid_stacked] | |
# The "peephole" weight matrices are only used when self.peepholes=True | |
if self.peepholes: | |
non_seqs += [self.W_cell_to_ingate, | |
self.W_cell_to_forgetgate, | |
self.W_cell_to_outgate] | |
# When we aren't precomputing the input outside of scan, we need to | |
# provide the input weights and biases to the step function | |
if not self.precompute_input: | |
non_seqs += [W_in_stacked, b_stacked] | |
if self.unroll_scan: | |
# Retrieve the dimensionality of the incoming layer | |
input_shape = self.input_shapes[0] | |
# Explicitly unroll the recurrence instead of using scan | |
cell_out, hid_out = unroll_scan( | |
fn=step_fun, | |
sequences=sequences, | |
outputs_info=[cell_init, hid_init], | |
go_backwards=self.backwards, | |
non_sequences=non_seqs, | |
n_steps=input_shape[1]) | |
else: | |
# Scan op iterates over first dimension of input and repeatedly | |
# applies the step function | |
cell_out, hid_out = theano.scan( | |
fn=step_fun, | |
sequences=sequences, | |
outputs_info=[cell_init, hid_init], | |
go_backwards=self.backwards, | |
truncate_gradient=self.gradient_steps, | |
non_sequences=non_seqs, | |
strict=True)[0] | |
# When it is requested that we only return the final sequence step, | |
# we need to slice it out immediately after scan is applied | |
if self.only_return_final: | |
hid_out = hid_out[-1] | |
cell_out = cell_out[-1] | |
else: | |
# dimshuffle back to (n_batch, n_time_steps, n_features)) | |
hid_out = hid_out.dimshuffle(1, 0, 2) | |
cell_out = cell_out.dimshuffle(1, 0, 2) | |
# if scan is backward reverse the output | |
if self.backwards: | |
hid_out = hid_out[:, ::-1] | |
cell_out = cell_out[:, ::-1] | |
return T.concatenate([cell_out, hid_out], axis=2) | |
nhid = 5 | |
batch_size = 1 | |
zdim=2 | |
cellvar = T.matrix('cell_state') | |
hidvar = T.matrix('hid_state') | |
inputvar = T.tensor3('input') | |
# create simple one-layer lstm | |
l_in = nn.layers.InputLayer(input_var=inputvar, shape=(None, None, zdim)) | |
lstm = LSTMSampleableLayer(l_in, nhid, cell_init = cellvar, hid_init = hidvar) | |
l_c = nn.layers.SliceLayer(lstm, axis=2, indices=slice(None,nhid)) | |
l_h = nn.layers.SliceLayer(lstm, axis=2, indices=slice(nhid,None)) | |
# compile function to run it forward one step | |
fwd = theano.function([inputvar, cellvar, hidvar], | |
nn.layers.get_output([l_c, l_h], deterministic=True)) | |
# pull out params and calculate same outputs in numpy | |
pd = dict((str(p), p.get_value()) for p in nn.layers.get_all_params(lstm)) | |
sigm = np.vectorize(lambda y : 1/(1+math.exp(-y))) | |
tanh = np.vectorize(lambda y: math.tanh(y)) | |
def slice_w(x,n,n_units): | |
return x[:, n*n_units:(n+1)*n_units] | |
W_in_stacked = np.concatenate([pd['W_in_to_ingate'], | |
pd['W_in_to_forgetgate'], | |
pd['W_in_to_cell'], | |
pd['W_in_to_outgate']], axis=1) | |
W_hid_stacked = np.concatenate([pd['W_hid_to_ingate'], | |
pd['W_hid_to_forgetgate'], | |
pd['W_hid_to_cell'], | |
pd['W_hid_to_outgate']], axis=1) | |
b_stacked = np.concatenate([pd['b_ingate'], | |
pd['b_forgetgate'], | |
pd['b_cell'], | |
pd['b_outgate']], axis=0) | |
# function to step lstm forward one step using only numpy | |
def np_fwd(x, c, h): | |
input_n = np.dot(x[0], W_in_stacked) + b_stacked | |
gates = input_n + np.dot(h, W_hid_stacked) | |
ingate = slice_w(gates, 0, nhid) | |
forgetgate = slice_w(gates, 1, nhid) | |
cell_input = slice_w(gates, 2, nhid) | |
outgate = slice_w(gates, 3, nhid) | |
ingate += c*pd['W_cell_to_ingate'] | |
forgetgate += c*pd['W_cell_to_forgetgate'] | |
ingate=sigm(ingate) | |
forgetgate = sigm(forgetgate) | |
cell_input = tanh(cell_input) | |
ct = forgetgate*c + ingate*cell_input | |
outgate += ct*pd['W_cell_to_outgate'] | |
outgate = sigm(outgate) | |
ht = outgate*tanh(ct) | |
return [ct], [ht] | |
# random initial state | |
c_np = c_net = [np.random.rand(1,5).astype(theano.config.floatX)] | |
h_np = h_net = [np.random.rand(1,5).astype(theano.config.floatX)] | |
#run forward 3 steps using random input vectors | |
for step_num in xrange(3): | |
x=np.random.rand(1,1,2).astype(theano.config.floatX) | |
c_net, h_net = fwd(x, c_net[-1], h_net[-1]) | |
c_np, h_np = np_fwd(x, c_np[-1], h_np[-1]) | |
print 'step {}, cell, numpy : '.format(step_num), c_np[-1] | |
print 'step {}, cell, theano: '.format(step_num), c_net[-1] | |
print 'step {}, hid, numpy : '.format(step_num), h_np[-1] | |
print 'step {}, hid, theano : '.format(step_num), h_net[-1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment