Skip to content

Instantly share code, notes, and snippets.

@dean-shaff
Created January 10, 2017 18:01
Show Gist options
  • Save dean-shaff/b1b53bd95dc48af0db0422a95c9db884 to your computer and use it in GitHub Desktop.
Save dean-shaff/b1b53bd95dc48af0db0422a95c9db884 to your computer and use it in GitHub Desktop.
import time
import pdb
import numpy as np
import theano
import theano.tensor as T
import h5py
class LSTMLayer(object):
def __init__(self,X,dim,**kwargs):
"""
Set up the weight matrices for a long short term memory (LSTM) unit.
I use the notation from Graves.
args:
- dim: A dictionary containing the dimensions of the units inside the LSTM.
kwargs:
-
"""
uni = np.random.uniform
def diag_constructor(limit,size,n):
"""
args:
- limit: A list whose two elements correspond to the limit for the numpy uniform function.
- size: (Int) one dimension of the square matrix.
- n: The number of these matrices to create.
"""
diag_ind = np.diag_indices(size)
mat = np.zeros((n,size,size))
for i in xrange(n):
diag_val = uni(limit[0], limit[1],size)
mat[i,diag_ind] = diag_val
return mat.astype(theano.config.floatX)
truncate = kwargs.get("bptt_truncate", -1)
nin = dim.get('in_dim')
nout = dim.get('out_dim')
nhid = dim.get('hid_dim')
self.nin = nin
self.nout = nout
self.nhid = nhid
# print("hidden dim", nhid)
# I can cast weight matrices differently. Instead of creating separate weight matrices for each connection, I create them
# based on their size. This cleans up the code and potentially makes things more efficient. I will say that it makes
# the recurrent step function harder to read.
self.Wi = theano.shared(uni(-np.sqrt(1.0/(nin*nhid)), np.sqrt(1.0/(nin*nhid)),(4, nin, nhid)).astype(theano.config.floatX),name='Wi')
self.Wh = theano.shared(uni(-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2)),(4, nhid, nhid)).astype(theano.config.floatX),name='Wh')
self.Wc = theano.shared(diag_constructor([-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2))],nhid,3),name='Wc')
self.b = theano.shared(np.zeros((4,nhid)), name='b')
self.Wy = theano.shared(uni(-np.sqrt(1.0/(nhid*nout)), np.sqrt(1.0/(nhid*nout)),(nhid,nout)).astype(theano.config.floatX),name='Wy')
self.by = theano.shared(np.zeros(nout), name='by')
self.params = [self.Wi, self.Wh, self.Wc, self.b, self.Wy, self.by]
def recurrent_step(x_t,b_tm1,s_tm1):
"""
Define the recurrent step.
args:
- x_t: the current sequence
- b_tm1: the previous b_t (b_{t minus 1})
- s_tml: the previous s_t (s_{t minus 1}) this is the state of the cell
"""
# Input
b_L = T.nnet.sigmoid(T.dot(x_t, self.Wi[0]) + T.dot(b_tm1,self.Wh[0]) + T.dot(s_tm1, self.Wc[0]) + self.b[0])
# Forget
b_Phi = T.nnet.sigmoid(T.dot(x_t,self.Wi[1]) + T.dot(b_tm1,self.Wh[1]) + T.dot(s_tm1, self.Wc[1]) + self.b[1])
# Cell
a_Cell = T.dot(x_t, self.Wi[2]) + T.dot(b_tm1, self.Wh[2]) + self.b[2]
s_t = b_Phi * s_tm1 + b_L*T.tanh(a_Cell)
# Output
b_Om = T.nnet.sigmoid(T.dot(x_t, self.Wi[3]) + T.dot(b_tm1,self.Wh[3]) + T.dot(s_t, self.Wc[2]) + self.b[3])
# Final output (What gets sent to the next step in the recurrence)
b_Cell = b_Om*T.tanh(s_t)
# Sequence output
o_t = T.nnet.softmax(T.dot(b_Cell, self.Wy) + self.by)
return b_Cell, s_t, o_t
out, _ = theano.scan(recurrent_step,
truncate_gradient=truncate,
sequences = X,
outputs_info=[
{'initial':T.zeros((X.shape[1],nhid))},
{'initial':T.zeros((X.shape[1],nhid))},
{'initial':None}
],
n_steps=X.shape[0])
self.b_out = out[0]
self.pred = out[2]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment