Last active
November 20, 2019 22:29
-
-
Save zeryx/e60f00d5da319674b6ebc3b7ae91498d to your computer and use it in GitHub Desktop.
using pytorch and attention layers to forecast timeseries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
from torch import nn | |
import numpy as np | |
class GaussianNoise: | |
def __init__(self, stddev: float): | |
super(GaussianNoise, self).__init__() | |
self.stddev = stddev | |
def add_noise(self, din): | |
rng = torch.autograd.Variable(torch.randn(din.size()) * self.stddev).float() | |
return din + rng | |
class Encoder(nn.Module): | |
def __init__(self, input_size, token_width): | |
super().__init__() | |
self.Q = torch.rand((input_size, token_width), requires_grad=True) | |
self.K = torch.rand((input_size, token_width), requires_grad=True) | |
self.V = torch.rand((input_size, token_width), requires_grad=True) | |
def forward(self, input: torch.Tensor): | |
query_t = torch.mm(input, self.Q) | |
key_t = torch.mm(input, self.K) | |
value_t = torch.mm(input, self.V) | |
score_t = (query_t*key_t)/64 | |
score_t = score_t.squeeze() | |
attention_t = torch.softmax(score_t, dim=1) | |
result = value_t*attention_t | |
_ = attention_t.detach().numpy() | |
return result | |
class Decoder(nn.Module): | |
def __init__(self, token_width, output_size): | |
super().__init__() | |
self.Q = torch.rand((token_width, output_size), requires_grad=True) | |
self.K = torch.rand((token_width, output_size), requires_grad=True) | |
self.V = torch.rand((token_width, output_size), requires_grad=True) | |
def forward(self, input: torch.Tensor): | |
query_t = torch.mm(input, self.Q) | |
key_t = torch.mm(input, self.K) | |
value_t = torch.mm(input, self.V) | |
score_t = (query_t*key_t)/64 | |
score_t = score_t.squeeze() | |
attention_t = torch.softmax(score_t, dim=1) | |
result = value_t*attention_t | |
_ = attention_t.detach().numpy() | |
return result | |
class Model(nn.Module): | |
def __init__(self, max_sequence, hidden_width, token_width): | |
super().__init__() | |
self.noise = GaussianNoise(stddev=0.05) | |
self.lin_1 = nn.Linear(max_sequence, hidden_width) | |
self.encoder1 = Encoder(hidden_width, token_width) | |
self.encoder2 = Encoder(token_width, token_width) | |
self.decoder = Decoder(token_width, hidden_width) | |
self.lin_final = nn.Linear(hidden_width, max_sequence) | |
def forward(self, input: torch.Tensor): | |
input = self.noise.add_noise(input) | |
out1 = self.lin_1(input) | |
out2 = self.encoder1(out1) | |
out3 = self.encoder2(out2) | |
out4 = self.decoder(out3) | |
final = self.lin_final(out4).squeeze() | |
return final | |
# challenging 1 dimensional timeseries function sampler | |
def equation(start, stop): | |
data = np.arange(start=start, stop=stop, dtype=np.float) | |
eq1 = np.sin(0.4*data) + 1 | |
eq2 = np.cos(0.02*data) - 2 | |
ep3 = 1.25*np.cos(np.power(data, -2)) | |
ep4 = 1.25*np.sin(np.power(data, -5)) | |
result = eq1 + eq2 - ep3 + ep4 | |
return result | |
criterion = nn.MSELoss() | |
seq_length = 100000 | |
slice_size = 100 | |
input = [] | |
expected_output = [] | |
for i in range(1, seq_length, slice_size): | |
in_i = equation(i, i + slice_size) | |
out_i = equation(i+slice_size, i+slice_size*2) | |
input.append(in_i) | |
expected_output.append(out_i) | |
input = torch.tensor(np.asarray(input)).float() | |
expected_output = torch.tensor(np.asarray(expected_output)).float() | |
model = Model(slice_size, 30, 15) | |
parameters = model.parameters() | |
optimizer = torch.optim.Adam(parameters) | |
while True: | |
optimizer.zero_grad() | |
permutation = torch.randperm(int(seq_length/slice_size)) | |
input_i = input[permutation] | |
expected_output_i = expected_output[permutation] | |
output_i = model.forward(input_i) | |
loss = criterion(output_i, expected_output_i) | |
loss_cpu = loss.item() | |
print('training loss: {}'.format(str(loss_cpu))) | |
loss.backward() | |
optimizer.step() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment