Skip to content

Instantly share code, notes, and snippets.

@ShigekiKarita
Created September 4, 2016 11:26
Show Gist options
  • Save ShigekiKarita/bde0aa1c10d7c852d0bf070c9b03a840 to your computer and use it in GitHub Desktop.
Save ShigekiKarita/bde0aa1c10d7c852d0bf070c9b03a840 to your computer and use it in GitHub Desktop.
import numpy
import six
import chainer
from chainer.functions.activation import lstm
from chainer.functions.array import concat
from chainer.functions.array import split_axis
from chainer import initializers
from chainer import link
from chainer import variable
from chainer.links.connection.convolution_2d import Convolution2D, _pair
def calc_pad(ksize):
def f(k):
assert k % 2 == 1
return int((k - 1) / 2)
return tuple(map(f, _pair(ksize)))
class Convolution2DLSTMBase(link.Chain):
def __init__(self, in_size, out_size, ksize=3,
lateral_init=None, upward_init=None,
bias_init=0, forget_bias_init=0, **kwargs):
pad = calc_pad(ksize)
super(Convolution2DLSTMBase, self).__init__(
upward=Convolution2D(in_size, 4 * out_size, ksize=ksize, pad=pad,
initialW=0, **kwargs),
lateral=Convolution2D(out_size, 4 * out_size, ksize=ksize, pad=pad,
initialW=0, nobias=True, **kwargs),
)
self.state_size = out_size
for i in six.moves.range(0, 4 * out_size, out_size):
initializers.init_weight(
self.lateral.W.data[i:i + out_size, :], lateral_init)
initializers.init_weight(
self.upward.W.data[i:i + out_size, :], upward_init)
a, i, f, o = lstm._extract_gates(
self.upward.b.data.reshape(1, 4 * out_size, 1))
initializers.init_weight(a, bias_init)
initializers.init_weight(i, bias_init)
initializers.init_weight(f, forget_bias_init)
initializers.init_weight(o, bias_init)
def out_shape(self, x):
return (x.shape[0], self.state_size, *x.shape[2:])
class StatelessConvolution2DLSTM(Convolution2DLSTMBase):
"""Stateless LSTM layer.
This is a convolution2D LSTM layer as a chain. Unlike the
:func:`~chainer.functions.lstm` function, this chain holds upward and
lateral connections as child links. This link doesn't keep cell and
hidden states.
Args:
in_size (int): Dimensionality of input vectors.
out_size (int): Dimensionality of output vectors.
Attributes:
upward (chainer.links.Convolution2D): Convolution2D layer of upward connections.
lateral (chainer.links.Convolution2D): Convolution2D layer of lateral connections.
"""
def __call__(self, c, h, x):
"""Returns new cell state and updated output of LSTM.
Args:
c (~chainer.Variable): Cell states of LSTM units.
h (~chainer.Variable): Output at the previous time step.
x (~chainer.Variable): A new batch from the input sequence.
Returns:
tuple of ~chainer.Variable: Returns ``(c_new, h_new)``, where
``c_new`` represents new cell state, and ``h_new`` is updated
output of LSTM units.
"""
lstm_in = self.upward(x)
if h is not None:
lstm_in += self.lateral(h)
if c is None:
xp = self.xp
c = variable.Variable(
xp.zeros(self.out_shape(x), dtype=x.dtype),
volatile='auto')
return lstm.lstm(c, lstm_in)
class Convolution2DLSTM(Convolution2DLSTMBase):
"""Convolution2D LSTM layer.
This is a convolution2D LSTM layer as a chain. Unlike the
:func:`~chainer.functions.lstm` function, which is defined as a stateless
activation function, this chain holds upward and lateral connections as
child links.
It also maintains *states*, including the cell state and the output
at the previous time step. Therefore, it can be used as a *stateful LSTM*.
This link supports variable length inputs. The mini-batch size of the
current input must be equal to or smaller than that of the previous one.
The mini-batch size of ``c`` and ``h`` is determined as that of the first
input ``x``.
When mini-batch size of ``i``-th input is smaller than that of the previous
input, this link only updates ``c[0:len(x)]`` and ``h[0:len(x)]`` and
doesn't change the rest of ``c`` and ``h``.
So, please sort input sequences in descending order of lengths before
applying the function.
Args:
in_size (int): Dimensionality of input vectors.
out_size (int): Dimensionality of output vectors.
lateral_init: A callable that takes ``numpy.ndarray`` or
``cupy.ndarray`` and edits its value.
It is used for initialization of the lateral connections.
Maybe be ``None`` to use default initialization.
upward_init: A callable that takes ``numpy.ndarray`` or
``cupy.ndarray`` and edits its value.
It is used for initialization of the upward connections.
Maybe be ``None`` to use default initialization.
bias_init: A callable that takes ``numpy.ndarray`` or
``cupy.ndarray`` and edits its value
It is used for initialization of the biases of cell input,
input gate and output gate.and gates of the upward connection.
Maybe a scalar, in that case, the bias is
initialized by this value.
Maybe be ``None`` to use default initialization.
forget_bias_init: A callable that takes ``numpy.ndarray`` or
``cupy.ndarray`` and edits its value
It is used for initialization of the biases of the forget gate of
the upward connection.
Maybe a scalar, in that case, the bias is
initialized by this value.
Maybe be ``None`` to use default initialization.
Attributes:
upward (~chainer.links.Convolution2D): Convolution2D layer of upward connections.
lateral (~chainer.links.Convolution2D): Convolution2D layer of lateral connections.
c (~chainer.Variable): Cell states of LSTM units.
h (~chainer.Variable): Output at the previous time step.
"""
def __init__(self, in_size, out_size, **kwargs):
super(Convolution2DLSTM, self).__init__(in_size, out_size, **kwargs)
self.reset_state()
def to_cpu(self):
super(Convolution2DLSTM, self).to_cpu()
if self.c is not None:
self.c.to_cpu()
if self.h is not None:
self.h.to_cpu()
def to_gpu(self, device=None):
super(Convolution2DLSTM, self).to_gpu(device)
if self.c is not None:
self.c.to_gpu(device)
if self.h is not None:
self.h.to_gpu(device)
def set_state(self, c, h):
"""Sets the internal state.
It sets the :attr:`c` and :attr:`h` attributes.
Args:
c (~chainer.Variable): A new cell states of LSTM units.
h (~chainer.Variable): A new output at the previous time step.
"""
assert isinstance(c, chainer.Variable)
assert isinstance(h, chainer.Variable)
c_ = c
h_ = h
if self.xp == numpy:
c_.to_cpu()
h_.to_cpu()
else:
c_.to_gpu()
h_.to_gpu()
self.c = c_
self.h = h_
def reset_state(self):
"""Resets the internal state.
It sets ``None`` to the :attr:`c` and :attr:`h` attributes.
"""
self.c = self.h = None
def __call__(self, x):
"""Updates the internal state and returns the LSTM outputs.
Args:
x (~chainer.Variable): A new batch from the input sequence.
Returns:
~chainer.Variable: Outputs of updated LSTM units.
"""
batch = x.shape[0]
lstm_in = self.upward(x)
h_rest = None
if self.h is not None:
h_size = self.h.shape[0]
if batch == 0:
h_rest = self.h
elif h_size < batch:
msg = ('The batch size of x must be equal to or less than the '
'size of the previous state h.')
raise TypeError(msg)
elif h_size > batch:
h_update, h_rest = split_axis.split_axis(
self.h, [batch], axis=0)
lstm_in += self.lateral(h_update)
else:
lstm_in += self.lateral(self.h)
if self.c is None:
xp = self.xp
self.c = variable.Variable(
xp.zeros(self.out_shape(x), dtype=x.dtype),
volatile='auto')
self.c, y = lstm.lstm(self.c, lstm_in)
if h_rest is None:
self.h = y
elif len(y.data) == 0:
self.h = h_rest
else:
self.h = concat.concat([y, h_rest], axis=0)
return y
import unittest
import numpy
import chainer
from chainer import cuda
from chainer import functions
from chainer import testing
from chainer.testing import attr
from conv_lstm import Convolution2DLSTM, StatelessConvolution2DLSTM
WH = (6, 7)
SHAPE = (3, 5) + WH
@testing.parameterize(
{'in_size': 10, 'out_size': 10},
{'in_size': 10, 'out_size': 40},
)
class TestLSTM(unittest.TestCase):
def setUp(self):
self.link = Convolution2DLSTM(self.in_size, self.out_size)
upward = self.link.upward.W.data
upward[...] = numpy.random.uniform(-1, 1, upward.shape)
lateral = self.link.lateral.W.data
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape)
self.link.zerograds()
self.upward = upward.copy() # fixed on CPU
self.lateral = lateral.copy() # fixed on CPU
x1_shape = (4, self.in_size) + WH
self.x1 = numpy.random.uniform(-1, 1, x1_shape).astype(numpy.float32)
x2_shape = (3, self.in_size) + WH
self.x2 = numpy.random.uniform(-1, 1, x2_shape).astype(numpy.float32)
x3_shape = (0, self.in_size) + WH
self.x3 = numpy.random.uniform(-1, 1, x3_shape).astype(numpy.float32)
def check_forward(self, x1_data, x2_data, x3_data):
xp = self.link.xp
x1 = chainer.Variable(x1_data)
h1 = self.link(x1)
c0 = chainer.Variable(xp.zeros(self.link.out_shape(x1),
dtype=self.x1.dtype))
c1_expect, h1_expect = functions.lstm(c0, self.link.upward(x1))
testing.assert_allclose(h1.data, h1_expect.data)
testing.assert_allclose(self.link.h.data, h1_expect.data)
testing.assert_allclose(self.link.c.data, c1_expect.data)
batch = len(x2_data)
x2 = chainer.Variable(x2_data)
h1_in, h1_rest = functions.split_axis(
self.link.h.data, [batch], axis=0)
y2 = self.link(x2)
c2_expect, y2_expect = \
functions.lstm(c1_expect,
self.link.upward(x2) + self.link.lateral(h1_in))
testing.assert_allclose(y2.data, y2_expect.data)
testing.assert_allclose(self.link.h.data[:batch], y2_expect.data)
testing.assert_allclose(self.link.h.data[batch:], h1_rest.data)
# FIXME: convolution2d does not support 0 mini-batch
# x3 = chainer.Variable(x3_data)
# h2_rest = self.link.h
# y3 = self.link(x3)
# c3_expect, y3_expect = \
# functions.lstm(c2_expect, self.link.upward(x3))
# testing.assert_allclose(y3.data, y3_expect.data)
# testing.assert_allclose(self.link.h.data, h2_rest.data)
def test_forward_cpu(self):
self.check_forward(self.x1, self.x2, self.x3)
@attr.gpu
def test_forward_gpu(self):
self.link.to_gpu()
self.check_forward(cuda.to_gpu(self.x1), cuda.to_gpu(self.x2),
cuda.to_gpu(self.x3))
class TestLSTMState(unittest.TestCase):
def setUp(self):
self.link = Convolution2DLSTM(5, 7)
self.x = chainer.Variable(
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32))
self.c = chainer.Variable(
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32))
self.h = chainer.Variable(
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32))
def check_state(self):
self.assertIsNone(self.link.c)
self.assertIsNone(self.link.h)
self.link(self.x)
self.assertIsNotNone(self.link.c)
self.assertIsNotNone(self.link.h)
def test_state_cpu(self):
self.check_state()
@attr.gpu
def test_state_gpu(self):
self.link.to_gpu()
self.x.to_gpu()
self.check_state()
def check_set_state(self, c, h):
self.link.set_state(c, h)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
testing.assert_allclose(c.data, self.link.c.data)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
testing.assert_allclose(h.data, self.link.h.data)
def test_set_state_cpu(self):
self.check_set_state(self.c, self.h)
@attr.gpu
def test_set_state_gpu(self):
self.link.to_gpu()
self.check_set_state(self.c, self.h)
def check_reset_state(self):
self.link(self.x)
self.link.reset_state()
self.assertIsNone(self.link.c)
self.assertIsNone(self.link.h)
def test_reset_state_cpu(self):
self.check_reset_state()
@attr.gpu
def test_reset_state_gpu(self):
self.link.to_gpu()
self.x.to_gpu()
self.check_reset_state()
class TestLSTMToCPUToGPU(unittest.TestCase):
def setUp(self):
self.link = Convolution2DLSTM(5, 7)
self.x = chainer.Variable(
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32))
def check_to_cpu(self, s):
self.link.to_cpu()
self.assertIsInstance(s.data, self.link.xp.ndarray)
self.link.to_cpu()
self.assertIsInstance(s.data, self.link.xp.ndarray)
def test_to_cpu_cpu(self):
self.link(self.x)
self.check_to_cpu(self.link.c)
self.check_to_cpu(self.link.h)
@attr.gpu
def test_to_cpu_gpu(self):
self.link.to_gpu()
self.x.to_gpu()
self.link(self.x)
self.check_to_cpu(self.link.c)
self.check_to_cpu(self.link.h)
def check_to_cpu_to_gpu(self, s):
self.link.to_gpu()
self.assertIsInstance(s.data, self.link.xp.ndarray)
self.link.to_gpu()
self.assertIsInstance(s.data, self.link.xp.ndarray)
self.link.to_cpu()
self.assertIsInstance(s.data, self.link.xp.ndarray)
self.link.to_gpu()
self.assertIsInstance(s.data, self.link.xp.ndarray)
@attr.gpu
def test_to_cpu_to_gpu_cpu(self):
self.link(self.x)
self.check_to_cpu_to_gpu(self.link.c)
self.check_to_cpu_to_gpu(self.link.h)
@attr.gpu
def test_to_cpu_to_gpu_gpu(self):
self.link.to_gpu()
self.x.to_gpu()
self.link(self.x)
self.check_to_cpu_to_gpu(self.link.c)
self.check_to_cpu_to_gpu(self.link.h)
class TestLSTMInvalidSize(unittest.TestCase):
in_size = 10
out_size = 20
def setUp(self):
self.link = Convolution2DLSTM(self.in_size, self.out_size)
upward = self.link.upward.W.data
upward[...] = numpy.random.uniform(-1, 1, upward.shape)
lateral = self.link.lateral.W.data
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape)
x1_shape = (4, self.in_size) + WH
self.x1 = numpy.random.uniform(-1, 1, x1_shape).astype(numpy.float32)
x2_shape = (5, self.in_size) + WH
self.x2 = numpy.random.uniform(-1, 1, x2_shape).astype(numpy.float32)
def check_forward_invalid_size(self, x1_data, x2_data):
x1 = chainer.Variable(x1_data)
x2 = chainer.Variable(x2_data)
self.link(x1)
with self.assertRaises(TypeError):
self.link(x2)
def test_forward_invalid_size_cpu(self):
self.check_forward_invalid_size(self.x1, self.x2)
@attr.gpu
def test_forward_invalid_size_gpu(self):
self.link.to_gpu()
self.check_forward_invalid_size(cuda.to_gpu(self.x1),
cuda.to_gpu(self.x2))
@testing.parameterize(
{'in_size': 10, 'out_size': 10},
{'in_size': 10, 'out_size': 40},
)
class TestStatelessLSTM(unittest.TestCase):
def setUp(self):
self.link = StatelessConvolution2DLSTM(self.in_size, self.out_size)
upward = self.link.upward.W.data
upward[...] = numpy.random.uniform(-1, 1, upward.shape)
lateral = self.link.lateral.W.data
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape)
self.link.zerograds()
self.upward = upward.copy() # fixed on CPU
self.lateral = lateral.copy() # fixed on CPU
self.in_shape = (4, self.in_size, 32, 32)
self.out_shape = (4, self.out_size, 32, 32)
self.x = numpy.random.uniform(-1, 1, self.in_shape).astype(numpy.float32)
def check_forward(self, x_data):
xp = self.link.xp
x = chainer.Variable(x_data)
c1, h1 = self.link(None, None, x)
c0 = chainer.Variable(xp.zeros(self.out_shape,
dtype=self.x.dtype))
c1_expect, h1_expect = functions.lstm(c0, self.link.upward(x))
testing.assert_allclose(h1.data, h1_expect.data)
testing.assert_allclose(c1.data, c1_expect.data)
c2, h2 = self.link(c1, h1, x)
c2_expect, h2_expect = \
functions.lstm(c1_expect,
self.link.upward(x) + self.link.lateral(h1))
testing.assert_allclose(h2.data, h2_expect.data)
testing.assert_allclose(c2.data, c2_expect.data)
def test_forward_cpu(self):
self.check_forward(self.x)
@attr.gpu
def test_forward_gpu(self):
self.link.to_gpu()
self.check_forward(cuda.to_gpu(self.x))
testing.run_module(__name__, __file__)
@NielEXXCRO
Copy link

Can you increase the testing parameter?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment