Skip to content

Instantly share code, notes, and snippets.

@zeryx
Last active November 20, 2019 22:29
Show Gist options
  • Save zeryx/e60f00d5da319674b6ebc3b7ae91498d to your computer and use it in GitHub Desktop.
Save zeryx/e60f00d5da319674b6ebc3b7ae91498d to your computer and use it in GitHub Desktop.
using pytorch and attention layers to forecast timeseries
import torch
from torch import nn
import numpy as np
class GaussianNoise:
def __init__(self, stddev: float):
super(GaussianNoise, self).__init__()
self.stddev = stddev
def add_noise(self, din):
rng = torch.autograd.Variable(torch.randn(din.size()) * self.stddev).float()
return din + rng
class Encoder(nn.Module):
def __init__(self, input_size, token_width):
super().__init__()
self.Q = torch.rand((input_size, token_width), requires_grad=True)
self.K = torch.rand((input_size, token_width), requires_grad=True)
self.V = torch.rand((input_size, token_width), requires_grad=True)
def forward(self, input: torch.Tensor):
query_t = torch.mm(input, self.Q)
key_t = torch.mm(input, self.K)
value_t = torch.mm(input, self.V)
score_t = (query_t*key_t)/64
score_t = score_t.squeeze()
attention_t = torch.softmax(score_t, dim=1)
result = value_t*attention_t
_ = attention_t.detach().numpy()
return result
class Decoder(nn.Module):
def __init__(self, token_width, output_size):
super().__init__()
self.Q = torch.rand((token_width, output_size), requires_grad=True)
self.K = torch.rand((token_width, output_size), requires_grad=True)
self.V = torch.rand((token_width, output_size), requires_grad=True)
def forward(self, input: torch.Tensor):
query_t = torch.mm(input, self.Q)
key_t = torch.mm(input, self.K)
value_t = torch.mm(input, self.V)
score_t = (query_t*key_t)/64
score_t = score_t.squeeze()
attention_t = torch.softmax(score_t, dim=1)
result = value_t*attention_t
_ = attention_t.detach().numpy()
return result
class Model(nn.Module):
def __init__(self, max_sequence, hidden_width, token_width):
super().__init__()
self.noise = GaussianNoise(stddev=0.05)
self.lin_1 = nn.Linear(max_sequence, hidden_width)
self.encoder1 = Encoder(hidden_width, token_width)
self.encoder2 = Encoder(token_width, token_width)
self.decoder = Decoder(token_width, hidden_width)
self.lin_final = nn.Linear(hidden_width, max_sequence)
def forward(self, input: torch.Tensor):
input = self.noise.add_noise(input)
out1 = self.lin_1(input)
out2 = self.encoder1(out1)
out3 = self.encoder2(out2)
out4 = self.decoder(out3)
final = self.lin_final(out4).squeeze()
return final
# challenging 1 dimensional timeseries function sampler
def equation(start, stop):
data = np.arange(start=start, stop=stop, dtype=np.float)
eq1 = np.sin(0.4*data) + 1
eq2 = np.cos(0.02*data) - 2
ep3 = 1.25*np.cos(np.power(data, -2))
ep4 = 1.25*np.sin(np.power(data, -5))
result = eq1 + eq2 - ep3 + ep4
return result
criterion = nn.MSELoss()
seq_length = 100000
slice_size = 100
input = []
expected_output = []
for i in range(1, seq_length, slice_size):
in_i = equation(i, i + slice_size)
out_i = equation(i+slice_size, i+slice_size*2)
input.append(in_i)
expected_output.append(out_i)
input = torch.tensor(np.asarray(input)).float()
expected_output = torch.tensor(np.asarray(expected_output)).float()
model = Model(slice_size, 30, 15)
parameters = model.parameters()
optimizer = torch.optim.Adam(parameters)
while True:
optimizer.zero_grad()
permutation = torch.randperm(int(seq_length/slice_size))
input_i = input[permutation]
expected_output_i = expected_output[permutation]
output_i = model.forward(input_i)
loss = criterion(output_i, expected_output_i)
loss_cpu = loss.item()
print('training loss: {}'.format(str(loss_cpu)))
loss.backward()
optimizer.step()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment