Created
September 4, 2016 11:26
-
-
Save ShigekiKarita/bde0aa1c10d7c852d0bf070c9b03a840 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
import six | |
import chainer | |
from chainer.functions.activation import lstm | |
from chainer.functions.array import concat | |
from chainer.functions.array import split_axis | |
from chainer import initializers | |
from chainer import link | |
from chainer import variable | |
from chainer.links.connection.convolution_2d import Convolution2D, _pair | |
def calc_pad(ksize): | |
def f(k): | |
assert k % 2 == 1 | |
return int((k - 1) / 2) | |
return tuple(map(f, _pair(ksize))) | |
class Convolution2DLSTMBase(link.Chain): | |
def __init__(self, in_size, out_size, ksize=3, | |
lateral_init=None, upward_init=None, | |
bias_init=0, forget_bias_init=0, **kwargs): | |
pad = calc_pad(ksize) | |
super(Convolution2DLSTMBase, self).__init__( | |
upward=Convolution2D(in_size, 4 * out_size, ksize=ksize, pad=pad, | |
initialW=0, **kwargs), | |
lateral=Convolution2D(out_size, 4 * out_size, ksize=ksize, pad=pad, | |
initialW=0, nobias=True, **kwargs), | |
) | |
self.state_size = out_size | |
for i in six.moves.range(0, 4 * out_size, out_size): | |
initializers.init_weight( | |
self.lateral.W.data[i:i + out_size, :], lateral_init) | |
initializers.init_weight( | |
self.upward.W.data[i:i + out_size, :], upward_init) | |
a, i, f, o = lstm._extract_gates( | |
self.upward.b.data.reshape(1, 4 * out_size, 1)) | |
initializers.init_weight(a, bias_init) | |
initializers.init_weight(i, bias_init) | |
initializers.init_weight(f, forget_bias_init) | |
initializers.init_weight(o, bias_init) | |
def out_shape(self, x): | |
return (x.shape[0], self.state_size, *x.shape[2:]) | |
class StatelessConvolution2DLSTM(Convolution2DLSTMBase): | |
"""Stateless LSTM layer. | |
This is a convolution2D LSTM layer as a chain. Unlike the | |
:func:`~chainer.functions.lstm` function, this chain holds upward and | |
lateral connections as child links. This link doesn't keep cell and | |
hidden states. | |
Args: | |
in_size (int): Dimensionality of input vectors. | |
out_size (int): Dimensionality of output vectors. | |
Attributes: | |
upward (chainer.links.Convolution2D): Convolution2D layer of upward connections. | |
lateral (chainer.links.Convolution2D): Convolution2D layer of lateral connections. | |
""" | |
def __call__(self, c, h, x): | |
"""Returns new cell state and updated output of LSTM. | |
Args: | |
c (~chainer.Variable): Cell states of LSTM units. | |
h (~chainer.Variable): Output at the previous time step. | |
x (~chainer.Variable): A new batch from the input sequence. | |
Returns: | |
tuple of ~chainer.Variable: Returns ``(c_new, h_new)``, where | |
``c_new`` represents new cell state, and ``h_new`` is updated | |
output of LSTM units. | |
""" | |
lstm_in = self.upward(x) | |
if h is not None: | |
lstm_in += self.lateral(h) | |
if c is None: | |
xp = self.xp | |
c = variable.Variable( | |
xp.zeros(self.out_shape(x), dtype=x.dtype), | |
volatile='auto') | |
return lstm.lstm(c, lstm_in) | |
class Convolution2DLSTM(Convolution2DLSTMBase): | |
"""Convolution2D LSTM layer. | |
This is a convolution2D LSTM layer as a chain. Unlike the | |
:func:`~chainer.functions.lstm` function, which is defined as a stateless | |
activation function, this chain holds upward and lateral connections as | |
child links. | |
It also maintains *states*, including the cell state and the output | |
at the previous time step. Therefore, it can be used as a *stateful LSTM*. | |
This link supports variable length inputs. The mini-batch size of the | |
current input must be equal to or smaller than that of the previous one. | |
The mini-batch size of ``c`` and ``h`` is determined as that of the first | |
input ``x``. | |
When mini-batch size of ``i``-th input is smaller than that of the previous | |
input, this link only updates ``c[0:len(x)]`` and ``h[0:len(x)]`` and | |
doesn't change the rest of ``c`` and ``h``. | |
So, please sort input sequences in descending order of lengths before | |
applying the function. | |
Args: | |
in_size (int): Dimensionality of input vectors. | |
out_size (int): Dimensionality of output vectors. | |
lateral_init: A callable that takes ``numpy.ndarray`` or | |
``cupy.ndarray`` and edits its value. | |
It is used for initialization of the lateral connections. | |
Maybe be ``None`` to use default initialization. | |
upward_init: A callable that takes ``numpy.ndarray`` or | |
``cupy.ndarray`` and edits its value. | |
It is used for initialization of the upward connections. | |
Maybe be ``None`` to use default initialization. | |
bias_init: A callable that takes ``numpy.ndarray`` or | |
``cupy.ndarray`` and edits its value | |
It is used for initialization of the biases of cell input, | |
input gate and output gate.and gates of the upward connection. | |
Maybe a scalar, in that case, the bias is | |
initialized by this value. | |
Maybe be ``None`` to use default initialization. | |
forget_bias_init: A callable that takes ``numpy.ndarray`` or | |
``cupy.ndarray`` and edits its value | |
It is used for initialization of the biases of the forget gate of | |
the upward connection. | |
Maybe a scalar, in that case, the bias is | |
initialized by this value. | |
Maybe be ``None`` to use default initialization. | |
Attributes: | |
upward (~chainer.links.Convolution2D): Convolution2D layer of upward connections. | |
lateral (~chainer.links.Convolution2D): Convolution2D layer of lateral connections. | |
c (~chainer.Variable): Cell states of LSTM units. | |
h (~chainer.Variable): Output at the previous time step. | |
""" | |
def __init__(self, in_size, out_size, **kwargs): | |
super(Convolution2DLSTM, self).__init__(in_size, out_size, **kwargs) | |
self.reset_state() | |
def to_cpu(self): | |
super(Convolution2DLSTM, self).to_cpu() | |
if self.c is not None: | |
self.c.to_cpu() | |
if self.h is not None: | |
self.h.to_cpu() | |
def to_gpu(self, device=None): | |
super(Convolution2DLSTM, self).to_gpu(device) | |
if self.c is not None: | |
self.c.to_gpu(device) | |
if self.h is not None: | |
self.h.to_gpu(device) | |
def set_state(self, c, h): | |
"""Sets the internal state. | |
It sets the :attr:`c` and :attr:`h` attributes. | |
Args: | |
c (~chainer.Variable): A new cell states of LSTM units. | |
h (~chainer.Variable): A new output at the previous time step. | |
""" | |
assert isinstance(c, chainer.Variable) | |
assert isinstance(h, chainer.Variable) | |
c_ = c | |
h_ = h | |
if self.xp == numpy: | |
c_.to_cpu() | |
h_.to_cpu() | |
else: | |
c_.to_gpu() | |
h_.to_gpu() | |
self.c = c_ | |
self.h = h_ | |
def reset_state(self): | |
"""Resets the internal state. | |
It sets ``None`` to the :attr:`c` and :attr:`h` attributes. | |
""" | |
self.c = self.h = None | |
def __call__(self, x): | |
"""Updates the internal state and returns the LSTM outputs. | |
Args: | |
x (~chainer.Variable): A new batch from the input sequence. | |
Returns: | |
~chainer.Variable: Outputs of updated LSTM units. | |
""" | |
batch = x.shape[0] | |
lstm_in = self.upward(x) | |
h_rest = None | |
if self.h is not None: | |
h_size = self.h.shape[0] | |
if batch == 0: | |
h_rest = self.h | |
elif h_size < batch: | |
msg = ('The batch size of x must be equal to or less than the ' | |
'size of the previous state h.') | |
raise TypeError(msg) | |
elif h_size > batch: | |
h_update, h_rest = split_axis.split_axis( | |
self.h, [batch], axis=0) | |
lstm_in += self.lateral(h_update) | |
else: | |
lstm_in += self.lateral(self.h) | |
if self.c is None: | |
xp = self.xp | |
self.c = variable.Variable( | |
xp.zeros(self.out_shape(x), dtype=x.dtype), | |
volatile='auto') | |
self.c, y = lstm.lstm(self.c, lstm_in) | |
if h_rest is None: | |
self.h = y | |
elif len(y.data) == 0: | |
self.h = h_rest | |
else: | |
self.h = concat.concat([y, h_rest], axis=0) | |
return y |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import numpy | |
import chainer | |
from chainer import cuda | |
from chainer import functions | |
from chainer import testing | |
from chainer.testing import attr | |
from conv_lstm import Convolution2DLSTM, StatelessConvolution2DLSTM | |
WH = (6, 7) | |
SHAPE = (3, 5) + WH | |
@testing.parameterize( | |
{'in_size': 10, 'out_size': 10}, | |
{'in_size': 10, 'out_size': 40}, | |
) | |
class TestLSTM(unittest.TestCase): | |
def setUp(self): | |
self.link = Convolution2DLSTM(self.in_size, self.out_size) | |
upward = self.link.upward.W.data | |
upward[...] = numpy.random.uniform(-1, 1, upward.shape) | |
lateral = self.link.lateral.W.data | |
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape) | |
self.link.zerograds() | |
self.upward = upward.copy() # fixed on CPU | |
self.lateral = lateral.copy() # fixed on CPU | |
x1_shape = (4, self.in_size) + WH | |
self.x1 = numpy.random.uniform(-1, 1, x1_shape).astype(numpy.float32) | |
x2_shape = (3, self.in_size) + WH | |
self.x2 = numpy.random.uniform(-1, 1, x2_shape).astype(numpy.float32) | |
x3_shape = (0, self.in_size) + WH | |
self.x3 = numpy.random.uniform(-1, 1, x3_shape).astype(numpy.float32) | |
def check_forward(self, x1_data, x2_data, x3_data): | |
xp = self.link.xp | |
x1 = chainer.Variable(x1_data) | |
h1 = self.link(x1) | |
c0 = chainer.Variable(xp.zeros(self.link.out_shape(x1), | |
dtype=self.x1.dtype)) | |
c1_expect, h1_expect = functions.lstm(c0, self.link.upward(x1)) | |
testing.assert_allclose(h1.data, h1_expect.data) | |
testing.assert_allclose(self.link.h.data, h1_expect.data) | |
testing.assert_allclose(self.link.c.data, c1_expect.data) | |
batch = len(x2_data) | |
x2 = chainer.Variable(x2_data) | |
h1_in, h1_rest = functions.split_axis( | |
self.link.h.data, [batch], axis=0) | |
y2 = self.link(x2) | |
c2_expect, y2_expect = \ | |
functions.lstm(c1_expect, | |
self.link.upward(x2) + self.link.lateral(h1_in)) | |
testing.assert_allclose(y2.data, y2_expect.data) | |
testing.assert_allclose(self.link.h.data[:batch], y2_expect.data) | |
testing.assert_allclose(self.link.h.data[batch:], h1_rest.data) | |
# FIXME: convolution2d does not support 0 mini-batch | |
# x3 = chainer.Variable(x3_data) | |
# h2_rest = self.link.h | |
# y3 = self.link(x3) | |
# c3_expect, y3_expect = \ | |
# functions.lstm(c2_expect, self.link.upward(x3)) | |
# testing.assert_allclose(y3.data, y3_expect.data) | |
# testing.assert_allclose(self.link.h.data, h2_rest.data) | |
def test_forward_cpu(self): | |
self.check_forward(self.x1, self.x2, self.x3) | |
@attr.gpu | |
def test_forward_gpu(self): | |
self.link.to_gpu() | |
self.check_forward(cuda.to_gpu(self.x1), cuda.to_gpu(self.x2), | |
cuda.to_gpu(self.x3)) | |
class TestLSTMState(unittest.TestCase): | |
def setUp(self): | |
self.link = Convolution2DLSTM(5, 7) | |
self.x = chainer.Variable( | |
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32)) | |
self.c = chainer.Variable( | |
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32)) | |
self.h = chainer.Variable( | |
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32)) | |
def check_state(self): | |
self.assertIsNone(self.link.c) | |
self.assertIsNone(self.link.h) | |
self.link(self.x) | |
self.assertIsNotNone(self.link.c) | |
self.assertIsNotNone(self.link.h) | |
def test_state_cpu(self): | |
self.check_state() | |
@attr.gpu | |
def test_state_gpu(self): | |
self.link.to_gpu() | |
self.x.to_gpu() | |
self.check_state() | |
def check_set_state(self, c, h): | |
self.link.set_state(c, h) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
testing.assert_allclose(c.data, self.link.c.data) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
testing.assert_allclose(h.data, self.link.h.data) | |
def test_set_state_cpu(self): | |
self.check_set_state(self.c, self.h) | |
@attr.gpu | |
def test_set_state_gpu(self): | |
self.link.to_gpu() | |
self.check_set_state(self.c, self.h) | |
def check_reset_state(self): | |
self.link(self.x) | |
self.link.reset_state() | |
self.assertIsNone(self.link.c) | |
self.assertIsNone(self.link.h) | |
def test_reset_state_cpu(self): | |
self.check_reset_state() | |
@attr.gpu | |
def test_reset_state_gpu(self): | |
self.link.to_gpu() | |
self.x.to_gpu() | |
self.check_reset_state() | |
class TestLSTMToCPUToGPU(unittest.TestCase): | |
def setUp(self): | |
self.link = Convolution2DLSTM(5, 7) | |
self.x = chainer.Variable( | |
numpy.random.uniform(-1, 1, SHAPE).astype(numpy.float32)) | |
def check_to_cpu(self, s): | |
self.link.to_cpu() | |
self.assertIsInstance(s.data, self.link.xp.ndarray) | |
self.link.to_cpu() | |
self.assertIsInstance(s.data, self.link.xp.ndarray) | |
def test_to_cpu_cpu(self): | |
self.link(self.x) | |
self.check_to_cpu(self.link.c) | |
self.check_to_cpu(self.link.h) | |
@attr.gpu | |
def test_to_cpu_gpu(self): | |
self.link.to_gpu() | |
self.x.to_gpu() | |
self.link(self.x) | |
self.check_to_cpu(self.link.c) | |
self.check_to_cpu(self.link.h) | |
def check_to_cpu_to_gpu(self, s): | |
self.link.to_gpu() | |
self.assertIsInstance(s.data, self.link.xp.ndarray) | |
self.link.to_gpu() | |
self.assertIsInstance(s.data, self.link.xp.ndarray) | |
self.link.to_cpu() | |
self.assertIsInstance(s.data, self.link.xp.ndarray) | |
self.link.to_gpu() | |
self.assertIsInstance(s.data, self.link.xp.ndarray) | |
@attr.gpu | |
def test_to_cpu_to_gpu_cpu(self): | |
self.link(self.x) | |
self.check_to_cpu_to_gpu(self.link.c) | |
self.check_to_cpu_to_gpu(self.link.h) | |
@attr.gpu | |
def test_to_cpu_to_gpu_gpu(self): | |
self.link.to_gpu() | |
self.x.to_gpu() | |
self.link(self.x) | |
self.check_to_cpu_to_gpu(self.link.c) | |
self.check_to_cpu_to_gpu(self.link.h) | |
class TestLSTMInvalidSize(unittest.TestCase): | |
in_size = 10 | |
out_size = 20 | |
def setUp(self): | |
self.link = Convolution2DLSTM(self.in_size, self.out_size) | |
upward = self.link.upward.W.data | |
upward[...] = numpy.random.uniform(-1, 1, upward.shape) | |
lateral = self.link.lateral.W.data | |
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape) | |
x1_shape = (4, self.in_size) + WH | |
self.x1 = numpy.random.uniform(-1, 1, x1_shape).astype(numpy.float32) | |
x2_shape = (5, self.in_size) + WH | |
self.x2 = numpy.random.uniform(-1, 1, x2_shape).astype(numpy.float32) | |
def check_forward_invalid_size(self, x1_data, x2_data): | |
x1 = chainer.Variable(x1_data) | |
x2 = chainer.Variable(x2_data) | |
self.link(x1) | |
with self.assertRaises(TypeError): | |
self.link(x2) | |
def test_forward_invalid_size_cpu(self): | |
self.check_forward_invalid_size(self.x1, self.x2) | |
@attr.gpu | |
def test_forward_invalid_size_gpu(self): | |
self.link.to_gpu() | |
self.check_forward_invalid_size(cuda.to_gpu(self.x1), | |
cuda.to_gpu(self.x2)) | |
@testing.parameterize( | |
{'in_size': 10, 'out_size': 10}, | |
{'in_size': 10, 'out_size': 40}, | |
) | |
class TestStatelessLSTM(unittest.TestCase): | |
def setUp(self): | |
self.link = StatelessConvolution2DLSTM(self.in_size, self.out_size) | |
upward = self.link.upward.W.data | |
upward[...] = numpy.random.uniform(-1, 1, upward.shape) | |
lateral = self.link.lateral.W.data | |
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape) | |
self.link.zerograds() | |
self.upward = upward.copy() # fixed on CPU | |
self.lateral = lateral.copy() # fixed on CPU | |
self.in_shape = (4, self.in_size, 32, 32) | |
self.out_shape = (4, self.out_size, 32, 32) | |
self.x = numpy.random.uniform(-1, 1, self.in_shape).astype(numpy.float32) | |
def check_forward(self, x_data): | |
xp = self.link.xp | |
x = chainer.Variable(x_data) | |
c1, h1 = self.link(None, None, x) | |
c0 = chainer.Variable(xp.zeros(self.out_shape, | |
dtype=self.x.dtype)) | |
c1_expect, h1_expect = functions.lstm(c0, self.link.upward(x)) | |
testing.assert_allclose(h1.data, h1_expect.data) | |
testing.assert_allclose(c1.data, c1_expect.data) | |
c2, h2 = self.link(c1, h1, x) | |
c2_expect, h2_expect = \ | |
functions.lstm(c1_expect, | |
self.link.upward(x) + self.link.lateral(h1)) | |
testing.assert_allclose(h2.data, h2_expect.data) | |
testing.assert_allclose(c2.data, c2_expect.data) | |
def test_forward_cpu(self): | |
self.check_forward(self.x) | |
@attr.gpu | |
def test_forward_gpu(self): | |
self.link.to_gpu() | |
self.check_forward(cuda.to_gpu(self.x)) | |
testing.run_module(__name__, __file__) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can you increase the testing parameter?