Created
January 10, 2017 18:01
-
-
Save dean-shaff/b1b53bd95dc48af0db0422a95c9db884 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pdb | |
import numpy as np | |
import theano | |
import theano.tensor as T | |
import h5py | |
class LSTMLayer(object): | |
def __init__(self,X,dim,**kwargs): | |
""" | |
Set up the weight matrices for a long short term memory (LSTM) unit. | |
I use the notation from Graves. | |
args: | |
- dim: A dictionary containing the dimensions of the units inside the LSTM. | |
kwargs: | |
- | |
""" | |
uni = np.random.uniform | |
def diag_constructor(limit,size,n): | |
""" | |
args: | |
- limit: A list whose two elements correspond to the limit for the numpy uniform function. | |
- size: (Int) one dimension of the square matrix. | |
- n: The number of these matrices to create. | |
""" | |
diag_ind = np.diag_indices(size) | |
mat = np.zeros((n,size,size)) | |
for i in xrange(n): | |
diag_val = uni(limit[0], limit[1],size) | |
mat[i,diag_ind] = diag_val | |
return mat.astype(theano.config.floatX) | |
truncate = kwargs.get("bptt_truncate", -1) | |
nin = dim.get('in_dim') | |
nout = dim.get('out_dim') | |
nhid = dim.get('hid_dim') | |
self.nin = nin | |
self.nout = nout | |
self.nhid = nhid | |
# print("hidden dim", nhid) | |
# I can cast weight matrices differently. Instead of creating separate weight matrices for each connection, I create them | |
# based on their size. This cleans up the code and potentially makes things more efficient. I will say that it makes | |
# the recurrent step function harder to read. | |
self.Wi = theano.shared(uni(-np.sqrt(1.0/(nin*nhid)), np.sqrt(1.0/(nin*nhid)),(4, nin, nhid)).astype(theano.config.floatX),name='Wi') | |
self.Wh = theano.shared(uni(-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2)),(4, nhid, nhid)).astype(theano.config.floatX),name='Wh') | |
self.Wc = theano.shared(diag_constructor([-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2))],nhid,3),name='Wc') | |
self.b = theano.shared(np.zeros((4,nhid)), name='b') | |
self.Wy = theano.shared(uni(-np.sqrt(1.0/(nhid*nout)), np.sqrt(1.0/(nhid*nout)),(nhid,nout)).astype(theano.config.floatX),name='Wy') | |
self.by = theano.shared(np.zeros(nout), name='by') | |
self.params = [self.Wi, self.Wh, self.Wc, self.b, self.Wy, self.by] | |
def recurrent_step(x_t,b_tm1,s_tm1): | |
""" | |
Define the recurrent step. | |
args: | |
- x_t: the current sequence | |
- b_tm1: the previous b_t (b_{t minus 1}) | |
- s_tml: the previous s_t (s_{t minus 1}) this is the state of the cell | |
""" | |
# Input | |
b_L = T.nnet.sigmoid(T.dot(x_t, self.Wi[0]) + T.dot(b_tm1,self.Wh[0]) + T.dot(s_tm1, self.Wc[0]) + self.b[0]) | |
# Forget | |
b_Phi = T.nnet.sigmoid(T.dot(x_t,self.Wi[1]) + T.dot(b_tm1,self.Wh[1]) + T.dot(s_tm1, self.Wc[1]) + self.b[1]) | |
# Cell | |
a_Cell = T.dot(x_t, self.Wi[2]) + T.dot(b_tm1, self.Wh[2]) + self.b[2] | |
s_t = b_Phi * s_tm1 + b_L*T.tanh(a_Cell) | |
# Output | |
b_Om = T.nnet.sigmoid(T.dot(x_t, self.Wi[3]) + T.dot(b_tm1,self.Wh[3]) + T.dot(s_t, self.Wc[2]) + self.b[3]) | |
# Final output (What gets sent to the next step in the recurrence) | |
b_Cell = b_Om*T.tanh(s_t) | |
# Sequence output | |
o_t = T.nnet.softmax(T.dot(b_Cell, self.Wy) + self.by) | |
return b_Cell, s_t, o_t | |
out, _ = theano.scan(recurrent_step, | |
truncate_gradient=truncate, | |
sequences = X, | |
outputs_info=[ | |
{'initial':T.zeros((X.shape[1],nhid))}, | |
{'initial':T.zeros((X.shape[1],nhid))}, | |
{'initial':None} | |
], | |
n_steps=X.shape[0]) | |
self.b_out = out[0] | |
self.pred = out[2] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment