Skip to content

Instantly share code, notes, and snippets.

@Yevgnen
Last active September 4, 2021 16:17
Show Gist options
  • Save Yevgnen/043db98deb42d7b33db69150e9d2a7d8 to your computer and use it in GitHub Desktop.
Save Yevgnen/043db98deb42d7b33db69150e9d2a7d8 to your computer and use it in GitHub Desktop.
PyTorch seq2seq.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.autograd import Variable
np.random.seed(0)
torch.manual_seed(0)
_RECURRENT_FN_MAPPING = {
'rnn': torch.nn.RNN,
'gru': torch.nn.GRU,
'lstm': torch.nn.LSTM,
}
def get_recurrent_cell(n_inputs,
num_units,
num_layers,
type_,
dropout=0.0,
bidirectional=False):
cls = _RECURRENT_FN_MAPPING.get(type_)
return cls(
n_inputs,
num_units,
num_layers,
dropout=dropout,
bidirectional=bidirectional)
class Recurrent(nn.Module):
def __init__(self,
num_units,
num_layers=1,
unit_type='gru',
bidirectional=False,
dropout=0.0,
embedding=None,
attn_type='general'):
super(Recurrent, self).__init__()
num_inputs = embedding.weight.size(1)
self._num_inputs = num_inputs
self._num_units = num_units
self._num_layers = num_layers
self._unit_type = unit_type
self._bidirectional = bidirectional
self._dropout = dropout
self._embedding = embedding
self._attn_type = attn_type
self._cell_fn = get_recurrent_cell(num_inputs, num_units, num_layers,
unit_type, dropout, bidirectional)
def init_hidden(self, batch_size):
direction = 1 if not self._bidirectional else 2
h = Variable(
torch.zeros(direction * self._num_layers, batch_size,
self._num_units))
if self._unit_type == 'lstm':
return (h, h.clone())
else:
return h
def forward(self, x, h, len_x):
# Sort by sequence lengths
sorted_indices = np.argsort(-len_x).tolist()
unsorted_indices = np.argsort(sorted_indices).tolist()
x = x[:, sorted_indices]
h = h[:, sorted_indices, :]
len_x = len_x[sorted_indices].tolist()
embedded = self._embedding(x)
packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, len_x)
if self._unit_type == 'lstm':
o, (h, c) = self._cell_fn(packed, h)
o, _ = torch.nn.utils.rnn.pad_packed_sequence(o)
return (o[:, unsorted_indices, :], (h[:, unsorted_indices, :],
c[:, unsorted_indices, :]))
else:
o, hh = self._cell_fn(packed, h)
o, _ = torch.nn.utils.rnn.pad_packed_sequence(o)
return (o[:, unsorted_indices, :], hh[:, unsorted_indices, :])
class Encoder(Recurrent):
pass
class Decoder(Recurrent):
pass
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, num_outputs):
super(Seq2Seq, self).__init__()
self._encoder = encoder
self._decoder = decoder
self._out = nn.Linear(decoder._num_units, num_outputs)
def forward(self, x, y, h, len_x, len_y):
# Encode
_, h = self._encoder(x, h, len_x)
# Decode
o, h = self._decoder(y, h, len_y)
# Project
o = self._out(o)
return F.log_softmax(o)
def load_data(size,
min_len=5,
max_len=15,
min_word=3,
max_word=100,
epoch=10,
batch_size=64,
pad=0,
bos=1,
eos=2):
src = [
np.random.randint(min_word, max_word - 1,
np.random.randint(min_len, max_len)).tolist()
for _ in range(size)
]
tgt_in = [[bos] + [xi + 1 for xi in x] for x in src]
tgt_out = [[xi + 1 for xi in x] + [eos] for x in src]
def _pad(batch):
max_len = max(len(x) for x in batch)
return np.asarray(
[
np.pad(
x, (0, max_len - len(x)),
mode='constant',
constant_values=pad) for x in batch
],
dtype=np.int64)
def _len(batch):
return np.asarray([len(x) for x in batch], dtype=np.int64)
for e in range(epoch):
batch_start = 0
while batch_start < size:
batch_end = batch_start + batch_size
s, ti, to = (src[batch_start:batch_end],
tgt_in[batch_start:batch_end],
tgt_out[batch_start:batch_end])
lens, lent = _len(s), _len(ti)
s, ti, to = _pad(s).T, _pad(ti).T, _pad(to).T
yield (Variable(torch.LongTensor(s)),
Variable(torch.LongTensor(ti)),
Variable(torch.LongTensor(to)), lens, lent)
batch_start += batch_size
def print_sample(x, y, yy):
x = x.data.numpy().T
y = y.data.numpy().T
yy = yy.data.numpy().T
for u, v, w in zip(x, y, yy):
print('--------')
print('S: ', u)
print('T: ', v)
print('P: ', w)
n_data = 50
min_len = 5
max_len = 10
vocab_size = 101
n_samples = 5
epoch = 100000
batch_size = 32
lr = 1e-2
clip = 3
emb_size = 50
hidden_size = 50
num_layers = 1
max_length = 15
src_embed = torch.nn.Embedding(vocab_size, emb_size)
tgt_embed = torch.nn.Embedding(vocab_size, emb_size)
eps = 1e-3
src_embed.weight.data.uniform_(-eps, eps)
tgt_embed.weight.data.uniform_(-eps, eps)
enc = Encoder(hidden_size, num_layers, embedding=src_embed)
dec = Decoder(hidden_size, num_layers, embedding=tgt_embed)
net = Seq2Seq(enc, dec, vocab_size)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
criterion = torch.nn.NLLLoss()
loader = load_data(
n_data,
min_len=min_len,
max_len=max_len,
max_word=vocab_size,
epoch=epoch,
batch_size=batch_size)
for i, (x, yin, yout, lenx, leny) in enumerate(loader):
net.train()
optimizer.zero_grad()
logits = net(x, yin, enc.init_hidden(x.size()[1]), lenx, leny)
loss = criterion(logits.view(-1, vocab_size), yout.contiguous().view(-1))
loss.backward()
torch.nn.utils.clip_grad_norm(net.parameters(), clip)
optimizer.step()
if i % 10 == 0:
print('step: {}, loss: {:.6f}'.format(i, loss.data[0]))
if i % 200 == 0 and i > 0:
net.eval()
x, yin, yout, lenx, leny = (x[:, :n_samples], yin[:, :n_samples],
yout[:, :n_samples], lenx[:n_samples],
leny[:n_samples])
outputs = net(x, yin, enc.init_hidden(x.size()[1]), lenx, leny)
_, preds = torch.max(outputs, 2)
print_sample(x, yout, preds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment