Skip to content

Instantly share code, notes, and snippets.

@ShigekiKarita
Created September 4, 2016 11:27
Show Gist options
  • Save ShigekiKarita/f9113b376356d0f4d099dfdecd23ef18 to your computer and use it in GitHub Desktop.
Save ShigekiKarita/f9113b376356d0f4d099dfdecd23ef18 to your computer and use it in GitHub Desktop.
from chainer.functions.activation import sigmoid
from chainer.functions.activation import tanh
from chainer.functions.array import reshape
from chainer.functions.array import split_axis
from chainer import link
from chainer.links.connection.convolution_2d import Convolution2D
from chainer import variable
from chainer.links.connection.convolution_2d import Convolution2D, _pair
def calc_pad(ksize):
def f(k):
assert k % 2 == 1
return int((k - 1) / 2)
return tuple(map(f, _pair(ksize)))
class StatefulPeepholeConvolution2DLSTM(link.Chain):
"""Convolution2D LSTM layer with peephole connections.
This is a Convolution2D LSTM layer with peephole connections as a chain.
Unlike the :class:`~chainer.links.LSTM` link, this chain holds ``peep_i``,
``peep_f`` and ``peep_o`` as child links besides ``upward`` and
``lateral``.
Given a input vector :math:`x`, Peephole returns the next hidden vector
:math:`h'` defined as
.. math::
a &=& \\tanh(upward x + lateral h), \\\\
i &=& \\sigma(upward x + lateral h + peep_i c), \\\\
f &=& \\sigma(upward x + lateral h + peep_f c), \\\\
c' &=& a \\odot i + f \\odot c, \\\\
o &=& \\sigma(upward x + lateral h + peep_o c'), \\\\
h' &=& o \\tanh(c'),
where :math:`\\sigma` is the sigmoid function, :math:`\\odot` is the
element-wise product, :math:`c` is the current cell state, :math:`c'`
is the next cell state and :math:`h` is the current hidden vector.
Args:
in_size(int): Dimension of the input vector :math:`x`.
out_size(int): Dimension of the hidden vector :math: `h`.
Attributes:
upward (~chainer.links.Convolution2D): Convolution2D layer of upward connections.
lateral (~chainer.links.Convolution2D): Convolution2D layer of lateral connections.
peep_i (~chainer.links.Convolution2D): Convolution2D layer of peephole connections
to the input gate.
peep_f (~chainer.links.Convolution2D): Convolution2D layer of peephole connections
to the forget gate.
peep_o (~chainer.links.Convolution2D): Convolution2D layer of peephole connections
to the output gate.
c (~chainer.Variable): Cell states of LSTM units.
h (~chainer.Variable): Output at the current time step.
"""
def __init__(self, in_size, out_size, ksize=3, **kwargs):
pad = calc_pad(ksize)
super(StatefulPeepholeConvolution2DLSTM, self).__init__(
upward=Convolution2D(in_size, 4 * out_size, ksize=ksize, pad=pad, **kwargs),
lateral=Convolution2D(out_size, 4 * out_size, ksize=ksize, pad=pad, nobias=True, **kwargs),
peep_i=Convolution2D(out_size, out_size, ksize=ksize, pad=pad, nobias=True, **kwargs),
peep_f=Convolution2D(out_size, out_size, ksize=ksize, pad=pad, nobias=True, **kwargs),
peep_o=Convolution2D(out_size, out_size, ksize=ksize, pad=pad, nobias=True, **kwargs),
)
self.state_size = out_size
self.reset_state()
def out_shape(self, x):
return (x.shape[0], self.state_size, *x.shape[2:])
def to_cpu(self):
super(StatefulPeepholeConvolution2DLSTM, self).to_cpu()
if self.c is not None:
self.c.to_cpu()
if self.h is not None:
self.h.to_cpu()
def to_gpu(self, device=None):
super(StatefulPeepholeConvolution2DLSTM, self).to_gpu(device)
if self.c is not None:
self.c.to_gpu(device)
if self.h is not None:
self.h.to_gpu(device)
def reset_state(self):
"""Resets the internal states.
It sets ``None`` to the :attr:`c` and :attr:`h` attributes.
"""
self.c = self.h = None
def split_gate(self, y):
rs = (y.shape[0], y.shape[1] // 4, 4, *y.shape[2:])
y = reshape.reshape(y, rs)
cs = self.c.shape
return [reshape.reshape(g, cs) for g in split_axis.split_axis(y, 4, 2)]
def __call__(self, x):
"""Updates the internal state and returns the LSTM outputs.
Args:
x (~chainer.Variable): A new batch from the input sequence.
Returns:
~chainer.Variable: Outputs of updated LSTM units.
"""
lstm_in = self.upward(x)
if self.h is not None:
lstm_in += self.lateral(self.h)
if self.c is None:
xp = self.xp
self.c = variable.Variable(
xp.zeros(self.out_shape(x), dtype=x.dtype),
volatile='auto')
a, i, f, o = self.split_gate(lstm_in)
peep_in_i = self.peep_i(self.c)
peep_in_f = self.peep_f(self.c)
a = tanh.tanh(a)
i = sigmoid.sigmoid(i + peep_in_i)
f = sigmoid.sigmoid(f + peep_in_f)
self.c = a * i + f * self.c
peep_in_o = self.peep_o(self.c)
o = sigmoid.sigmoid(o + peep_in_o)
self.h = o * tanh.tanh(self.c)
return self.h
import unittest
import numpy
import chainer
from chainer import cuda
from chainer import gradient_check
from chainer import testing
from chainer.testing import attr
from conv_peephole import StatefulPeepholeConvolution2DLSTM
def _sigmoid(x):
xp = cuda.get_array_module(x)
return 1 / (1 + xp.exp(-x))
def _peephole(func, c, h, x):
xp = cuda.get_array_module(x)
lstm_in = func.upward(x).data
lstm_in += func.lateral(h).data
a, i, f, o = [g.data for g in func.split_gate(lstm_in)]
peep_in_i = func.peep_i(c).data
peep_in_f = func.peep_f(c).data
a = xp.tanh(a)
i = _sigmoid(i + peep_in_i)
f = _sigmoid(f + peep_in_f)
c_next = a * i + f * c
peep_in_o = func.peep_o(c_next).data
o = _sigmoid(o + peep_in_o)
y = o * xp.tanh(c_next)
return c_next, y
WH = (6, 7)
SHAPE = (3, 5) + WH
@testing.parameterize(
{'in_size': 10, 'out_size': 10},
{'in_size': 10, 'out_size': 20},
)
class TestPeephole(unittest.TestCase):
def setUp(self):
self.link = StatefulPeepholeConvolution2DLSTM(self.in_size, self.out_size)
upward = self.link.upward.W.data
upward[...] = numpy.random.uniform(-1, 1, upward.shape)
lateral = self.link.lateral.W.data
lateral[...] = numpy.random.uniform(-1, 1, lateral.shape)
peep_i = self.link.peep_i.W.data
peep_i[...] = numpy.random.uniform(-1, 1, peep_i.shape)
peep_f = self.link.peep_f.W.data
peep_f[...] = numpy.random.uniform(-1, 1, peep_f.shape)
peep_o = self.link.peep_o.W.data
peep_o[...] = numpy.random.uniform(-1, 1, peep_o.shape)
c_shape = (1, self.out_size) + WH
h_shape = (1, self.out_size) + WH
x_shape = (4, self.in_size) + WH
gy_shape = (4, self.out_size) + WH
self.c = numpy.zeros(c_shape).astype(numpy.float32)
self.h = numpy.zeros(h_shape).astype(numpy.float32)
self.x = numpy.random.uniform(-1, 1, x_shape).astype(numpy.float32)
self.gy = numpy.random.uniform(-1, 1, gy_shape).astype(numpy.float32)
def _forward(self, link, x):
return link(x)
def check_forward(self, c_data, h_data, x_data):
x = chainer.Variable(x_data)
h1 = self.link(x)
c1_expect, h1_expect = _peephole(self.link, c_data, h_data, x_data)
testing.assert_allclose(h1.data, h1_expect)
testing.assert_allclose(self.link.c.data, c1_expect)
testing.assert_allclose(self.link.h.data, h1_expect)
h2 = self.link(x)
c2_expect, h2_expect = _peephole(self.link,
c1_expect, h1_expect, x_data)
testing.assert_allclose(h2.data, h2_expect)
testing.assert_allclose(self.link.c.data, c2_expect)
testing.assert_allclose(self.link.h.data, h2_expect)
def test_forward_cpu(self):
self.check_forward(self.c, self.h, self.x)
@attr.gpu
def test_forward_gpu(self):
self.link.to_gpu()
self.check_forward(cuda.to_gpu(self.c),
cuda.to_gpu(self.h),
cuda.to_gpu(self.x))
def check_backward(self, c_data, h_data, x_data, y_grad):
x = chainer.Variable(x_data)
y = self._forward(self.link, x)
y.grad = y_grad
y.backward()
def f():
c, y = _peephole(self.link, c_data, h_data, x_data)
return y,
gx, = gradient_check.numerical_grad(f, (x.data,), (y.grad,))
testing.assert_allclose(gx, x.grad, atol=1e-3)
def test_backward_cpu(self):
self.check_backward(self.c, self.h, self.x, self.gy)
@attr.gpu
def test_backward_gpu(self):
self.link.to_gpu()
self.check_backward(cuda.to_gpu(self.c),
cuda.to_gpu(self.h),
cuda.to_gpu(self.x),
cuda.to_gpu(self.gy))
class TestPeepholeState(unittest.TestCase):
def setUp(self):
in_size, out_size = 10, 8
self.link = StatefulPeepholeConvolution2DLSTM(in_size, out_size)
def check_reset_state(self):
self.link.reset_state()
self.assertIsNone(self.link.c)
self.assertIsNone(self.link.h)
def test_reset_state_cpu(self):
self.check_reset_state()
@attr.gpu
def test_reset_state_gpu(self):
self.link.to_gpu()
self.check_reset_state()
class TestPeepholeToCPUToGPU(unittest.TestCase):
def setUp(self):
in_size, out_size = 10, 8
self.link = StatefulPeepholeConvolution2DLSTM(in_size, out_size)
self.c = chainer.Variable(
numpy.random.uniform(-1, 1, (1, out_size)).astype(numpy.float32))
self.h = chainer.Variable(
numpy.random.uniform(-1, 1, (1, out_size)).astype(numpy.float32))
def check_to_cpu(self, c, h):
self.link.c = c
self.link.h = h
self.link.to_cpu()
self.assertIs(self.link.xp, numpy)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
self.link.to_cpu()
self.assertIs(self.link.xp, numpy)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
def test_to_cpu_cpu(self):
self.check_to_cpu(self.c, self.h)
@attr.gpu
def test_to_cpu_gpu(self):
self.c.to_gpu()
self.h.to_gpu()
self.check_to_cpu(self.c, self.h)
def check_to_cpu_to_gpu(self, c, h):
self.link.c = c
self.link.h = h
self.link.to_gpu()
self.assertIs(self.link.xp, cuda.cupy)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
self.link.to_gpu()
self.assertIs(self.link.xp, cuda.cupy)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
self.link.to_cpu()
self.assertIs(self.link.xp, numpy)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
self.link.to_gpu()
self.assertIs(self.link.xp, cuda.cupy)
self.assertIsInstance(self.link.c.data, self.link.xp.ndarray)
self.assertIsInstance(self.link.h.data, self.link.xp.ndarray)
@attr.gpu
def test_to_cpu_to_gpu_cpu(self):
self.check_to_cpu_to_gpu(self.c, self.h)
@attr.gpu
def test_to_cpu_to_gpu_gpu(self):
self.c.to_gpu()
self.h.to_gpu()
self.check_to_cpu_to_gpu(self.c, self.h)
testing.run_module(__name__, __file__)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment