Created
September 4, 2016 11:27
-
-
Save ShigekiKarita/f9113b376356d0f4d099dfdecd23ef18 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from chainer.functions.activation import sigmoid | |
from chainer.functions.activation import tanh | |
from chainer.functions.array import reshape | |
from chainer.functions.array import split_axis | |
from chainer import link | |
from chainer.links.connection.convolution_2d import Convolution2D | |
from chainer import variable | |
from chainer.links.connection.convolution_2d import Convolution2D, _pair | |
def calc_pad(ksize): | |
def f(k): | |
assert k % 2 == 1 | |
return int((k - 1) / 2) | |
return tuple(map(f, _pair(ksize))) | |
class StatefulPeepholeConvolution2DLSTM(link.Chain): | |
"""Convolution2D LSTM layer with peephole connections. | |
This is a Convolution2D LSTM layer with peephole connections as a chain. | |
Unlike the :class:`~chainer.links.LSTM` link, this chain holds ``peep_i``, | |
``peep_f`` and ``peep_o`` as child links besides ``upward`` and | |
``lateral``. | |
Given a input vector :math:`x`, Peephole returns the next hidden vector | |
:math:`h'` defined as | |
.. math:: | |
a &=& \\tanh(upward x + lateral h), \\\\ | |
i &=& \\sigma(upward x + lateral h + peep_i c), \\\\ | |
f &=& \\sigma(upward x + lateral h + peep_f c), \\\\ | |
c' &=& a \\odot i + f \\odot c, \\\\ | |
o &=& \\sigma(upward x + lateral h + peep_o c'), \\\\ | |
h' &=& o \\tanh(c'), | |
where :math:`\\sigma` is the sigmoid function, :math:`\\odot` is the | |
element-wise product, :math:`c` is the current cell state, :math:`c'` | |
is the next cell state and :math:`h` is the current hidden vector. | |
Args: | |
in_size(int): Dimension of the input vector :math:`x`. | |
out_size(int): Dimension of the hidden vector :math: `h`. | |
Attributes: | |
upward (~chainer.links.Convolution2D): Convolution2D layer of upward connections. | |
lateral (~chainer.links.Convolution2D): Convolution2D layer of lateral connections. | |
peep_i (~chainer.links.Convolution2D): Convolution2D layer of peephole connections | |
to the input gate. | |
peep_f (~chainer.links.Convolution2D): Convolution2D layer of peephole connections | |
to the forget gate. | |
peep_o (~chainer.links.Convolution2D): Convolution2D layer of peephole connections | |
to the output gate. | |
c (~chainer.Variable): Cell states of LSTM units. | |
h (~chainer.Variable): Output at the current time step. | |
""" | |
def __init__(self, in_size, out_size, ksize=3, **kwargs): | |
pad = calc_pad(ksize) | |
super(StatefulPeepholeConvolution2DLSTM, self).__init__( | |
upward=Convolution2D(in_size, 4 * out_size, ksize=ksize, pad=pad, **kwargs), | |
lateral=Convolution2D(out_size, 4 * out_size, ksize=ksize, pad=pad, nobias=True, **kwargs), | |
peep_i=Convolution2D(out_size, out_size, ksize=ksize, pad=pad, nobias=True, **kwargs), | |
peep_f=Convolution2D(out_size, out_size, ksize=ksize, pad=pad, nobias=True, **kwargs), | |
peep_o=Convolution2D(out_size, out_size, ksize=ksize, pad=pad, nobias=True, **kwargs), | |
) | |
self.state_size = out_size | |
self.reset_state() | |
def out_shape(self, x): | |
return (x.shape[0], self.state_size, *x.shape[2:]) | |
def to_cpu(self): | |
super(StatefulPeepholeConvolution2DLSTM, self).to_cpu() | |
if self.c is not None: | |
self.c.to_cpu() | |
if self.h is not None: | |
self.h.to_cpu() | |
def to_gpu(self, device=None): | |
super(StatefulPeepholeConvolution2DLSTM, self).to_gpu(device) | |
if self.c is not None: | |
self.c.to_gpu(device) | |
if self.h is not None: | |
self.h.to_gpu(device) | |
def reset_state(self): | |
"""Resets the internal states. | |
It sets ``None`` to the :attr:`c` and :attr:`h` attributes. | |
""" | |
self.c = self.h = None | |
def split_gate(self, y): | |
rs = (y.shape[0], y.shape[1] // 4, 4, *y.shape[2:]) | |
y = reshape.reshape(y, rs) | |
cs = self.c.shape | |
return [reshape.reshape(g, cs) for g in split_axis.split_axis(y, 4, 2)] | |
def __call__(self, x): | |
"""Updates the internal state and returns the LSTM outputs. | |
Args: | |
x (~chainer.Variable): A new batch from the input sequence. | |
Returns: | |
~chainer.Variable: Outputs of updated LSTM units. | |
""" | |
lstm_in = self.upward(x) | |
if self.h is not None: | |
lstm_in += self.lateral(self.h) | |
if self.c is None: | |
xp = self.xp | |
self.c = variable.Variable( | |
xp.zeros(self.out_shape(x), dtype=x.dtype), | |
volatile='auto') | |
a, i, f, o = self.split_gate(lstm_in) | |
peep_in_i = self.peep_i(self.c) | |
peep_in_f = self.peep_f(self.c) | |
a = tanh.tanh(a) | |
i = sigmoid.sigmoid(i + peep_in_i) | |
f = sigmoid.sigmoid(f + peep_in_f) | |
self.c = a * i + f * self.c | |
peep_in_o = self.peep_o(self.c) | |
o = sigmoid.sigmoid(o + peep_in_o) | |
self.h = o * tanh.tanh(self.c) | |
return self.h |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import numpy | |
import chainer | |
from chainer import cuda | |
from chainer import gradient_check | |
from chainer import testing | |
from chainer.testing import attr | |
from conv_peephole import StatefulPeepholeConvolution2DLSTM | |
def _sigmoid(x): | |
xp = cuda.get_array_module(x) | |
return 1 / (1 + xp.exp(-x)) | |
def _peephole(func, c, h, x): | |
xp = cuda.get_array_module(x) | |
lstm_in = func.upward(x).data | |
lstm_in += func.lateral(h).data | |
a, i, f, o = [g.data for g in func.split_gate(lstm_in)] | |
peep_in_i = func.peep_i(c).data | |
peep_in_f = func.peep_f(c).data | |
a = xp.tanh(a) | |
i = _sigmoid(i + peep_in_i) | |
f = _sigmoid(f + peep_in_f) | |
c_next = a * i + f * c | |
peep_in_o = func.peep_o(c_next).data | |
o = _sigmoid(o + peep_in_o) | |
y = o * xp.tanh(c_next) | |
return c_next, y | |
WH = (6, 7) | |
SHAPE = (3, 5) + WH | |
@testing.parameterize( | |
{'in_size': 10, 'out_size': 10}, | |
{'in_size': 10, 'out_size': 20}, | |
) | |
class TestPeephole(unittest.TestCase): | |
def setUp(self): | |
self.link = StatefulPeepholeConvolution2DLSTM(self.in_size, self.out_size) | |
upward = self.link.upward.W.data | |
upward[...] = numpy.random.uniform(-1, 1, upward.shape) | |
lateral = self.link.lateral.W.data | |
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape) | |
peep_i = self.link.peep_i.W.data | |
peep_i[...] = numpy.random.uniform(-1, 1, peep_i.shape) | |
peep_f = self.link.peep_f.W.data | |
peep_f[...] = numpy.random.uniform(-1, 1, peep_f.shape) | |
peep_o = self.link.peep_o.W.data | |
peep_o[...] = numpy.random.uniform(-1, 1, peep_o.shape) | |
c_shape = (1, self.out_size) + WH | |
h_shape = (1, self.out_size) + WH | |
x_shape = (4, self.in_size) + WH | |
gy_shape = (4, self.out_size) + WH | |
self.c = numpy.zeros(c_shape).astype(numpy.float32) | |
self.h = numpy.zeros(h_shape).astype(numpy.float32) | |
self.x = numpy.random.uniform(-1, 1, x_shape).astype(numpy.float32) | |
self.gy = numpy.random.uniform(-1, 1, gy_shape).astype(numpy.float32) | |
def _forward(self, link, x): | |
return link(x) | |
def check_forward(self, c_data, h_data, x_data): | |
x = chainer.Variable(x_data) | |
h1 = self.link(x) | |
c1_expect, h1_expect = _peephole(self.link, c_data, h_data, x_data) | |
testing.assert_allclose(h1.data, h1_expect) | |
testing.assert_allclose(self.link.c.data, c1_expect) | |
testing.assert_allclose(self.link.h.data, h1_expect) | |
h2 = self.link(x) | |
c2_expect, h2_expect = _peephole(self.link, | |
c1_expect, h1_expect, x_data) | |
testing.assert_allclose(h2.data, h2_expect) | |
testing.assert_allclose(self.link.c.data, c2_expect) | |
testing.assert_allclose(self.link.h.data, h2_expect) | |
def test_forward_cpu(self): | |
self.check_forward(self.c, self.h, self.x) | |
@attr.gpu | |
def test_forward_gpu(self): | |
self.link.to_gpu() | |
self.check_forward(cuda.to_gpu(self.c), | |
cuda.to_gpu(self.h), | |
cuda.to_gpu(self.x)) | |
def check_backward(self, c_data, h_data, x_data, y_grad): | |
x = chainer.Variable(x_data) | |
y = self._forward(self.link, x) | |
y.grad = y_grad | |
y.backward() | |
def f(): | |
c, y = _peephole(self.link, c_data, h_data, x_data) | |
return y, | |
gx, = gradient_check.numerical_grad(f, (x.data,), (y.grad,)) | |
testing.assert_allclose(gx, x.grad, atol=1e-3) | |
def test_backward_cpu(self): | |
self.check_backward(self.c, self.h, self.x, self.gy) | |
@attr.gpu | |
def test_backward_gpu(self): | |
self.link.to_gpu() | |
self.check_backward(cuda.to_gpu(self.c), | |
cuda.to_gpu(self.h), | |
cuda.to_gpu(self.x), | |
cuda.to_gpu(self.gy)) | |
class TestPeepholeState(unittest.TestCase): | |
def setUp(self): | |
in_size, out_size = 10, 8 | |
self.link = StatefulPeepholeConvolution2DLSTM(in_size, out_size) | |
def check_reset_state(self): | |
self.link.reset_state() | |
self.assertIsNone(self.link.c) | |
self.assertIsNone(self.link.h) | |
def test_reset_state_cpu(self): | |
self.check_reset_state() | |
@attr.gpu | |
def test_reset_state_gpu(self): | |
self.link.to_gpu() | |
self.check_reset_state() | |
class TestPeepholeToCPUToGPU(unittest.TestCase): | |
def setUp(self): | |
in_size, out_size = 10, 8 | |
self.link = StatefulPeepholeConvolution2DLSTM(in_size, out_size) | |
self.c = chainer.Variable( | |
numpy.random.uniform(-1, 1, (1, out_size)).astype(numpy.float32)) | |
self.h = chainer.Variable( | |
numpy.random.uniform(-1, 1, (1, out_size)).astype(numpy.float32)) | |
def check_to_cpu(self, c, h): | |
self.link.c = c | |
self.link.h = h | |
self.link.to_cpu() | |
self.assertIs(self.link.xp, numpy) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
self.link.to_cpu() | |
self.assertIs(self.link.xp, numpy) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
def test_to_cpu_cpu(self): | |
self.check_to_cpu(self.c, self.h) | |
@attr.gpu | |
def test_to_cpu_gpu(self): | |
self.c.to_gpu() | |
self.h.to_gpu() | |
self.check_to_cpu(self.c, self.h) | |
def check_to_cpu_to_gpu(self, c, h): | |
self.link.c = c | |
self.link.h = h | |
self.link.to_gpu() | |
self.assertIs(self.link.xp, cuda.cupy) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
self.link.to_gpu() | |
self.assertIs(self.link.xp, cuda.cupy) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
self.link.to_cpu() | |
self.assertIs(self.link.xp, numpy) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
self.link.to_gpu() | |
self.assertIs(self.link.xp, cuda.cupy) | |
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray) | |
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray) | |
@attr.gpu | |
def test_to_cpu_to_gpu_cpu(self): | |
self.check_to_cpu_to_gpu(self.c, self.h) | |
@attr.gpu | |
def test_to_cpu_to_gpu_gpu(self): | |
self.c.to_gpu() | |
self.h.to_gpu() | |
self.check_to_cpu_to_gpu(self.c, self.h) | |
testing.run_module(__name__, __file__) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment