Created
January 18, 2018 07:17
-
-
Save optman/804c810855dfb388668cbbdb09e6199f to your computer and use it in GitHub Desktop.
sine wave lstm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#fork from https://github.com/jaungiers/LSTM-Neural-Network-for-Time-Series-Prediction | |
#minor fix, simpler model and normalize algorithm | |
import os | |
import time | |
import warnings | |
import numpy as np | |
from numpy import newaxis | |
from keras.layers.core import Dense, Activation, Dropout | |
from keras.layers.recurrent import LSTM | |
from keras.models import Sequential | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' #Hide messy TensorFlow warnings | |
warnings.filterwarnings("ignore") #Hide messy Numpy warnings | |
def load_data(filename, seq_len, normalise): | |
f = open(filename, 'rb').read() | |
data = f.decode().split('\n') | |
data = [float(d) if len(d) > 0 else 0 for d in data] | |
base = 0 | |
if normalise: | |
base = max(data) | |
data = [d/base for d in data] | |
sequence_length = seq_len + 1 | |
result = [] | |
for index in range(len(data) - sequence_length): | |
result.append(data[index: index + sequence_length]) | |
result = np.array(result, dtype=np.float64) | |
row = round(0.9 * result.shape[0]) | |
train = result[:int(row), :] | |
np.random.shuffle(train) | |
x_train = train[:, :-1] | |
y_train = train[:, -1] | |
x_test = result[int(row):, :-1] | |
y_test = result[int(row):, -1] | |
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) | |
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1)) | |
return [base, x_train, y_train, x_test, y_test] | |
def build_model(input_shape): | |
model = Sequential() | |
model.add(LSTM(input_shape[0], input_shape=input_shape)) | |
model.add(Dense(1)) | |
model.compile(loss="mse", optimizer="rmsprop") | |
return model | |
def predict_point_by_point(model, data): | |
#Predict each timestep given the last sequence of true data, in effect only predicting 1 step ahead each time | |
predicted = model.predict(data) | |
predicted = np.reshape(predicted, (predicted.size,)) | |
return predicted | |
def predict_sequence_full(model, data): | |
#Shift the window by 1 new prediction each time, re-run predictions on new window | |
curr_frame = data[0] | |
predicted = [] | |
for i in range(len(data)): | |
predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0]) | |
curr_frame = curr_frame[1:] | |
curr_frame = np.insert(curr_frame, len(curr_frame), predicted[-1], axis=0) | |
return predicted | |
def predict_sequences_multiple(model, data, prediction_len): | |
#Predict sequence of 50 steps before shifting prediction run forward by 50 steps | |
prediction_seqs = [] | |
for i in range(int(len(data)/prediction_len)): | |
curr_frame = data[i*prediction_len] | |
predicted = [] | |
for j in range(prediction_len): | |
predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0]) | |
curr_frame = curr_frame[1:] | |
curr_frame = np.insert(curr_frame, len(curr_frame), predicted[-1], axis=0) | |
prediction_seqs.append(predicted) | |
return prediction_seqs | |
import time | |
import matplotlib.pyplot as plt | |
def plot_results(predicted_data, true_data): | |
fig = plt.figure(facecolor='white') | |
ax = fig.add_subplot(111) | |
ax.plot(true_data, label='True Data', color="green") | |
ax.plot(predicted_data, label='Prediction', color="blue") | |
plt.legend() | |
plt.show() | |
def plot_results_multiple(predicted_data, true_data, prediction_len): | |
fig = plt.figure(facecolor='white') | |
ax = fig.add_subplot(111) | |
ax.plot(true_data, label='True Data', color="green") | |
#Pad the list of predictions to shift it in the graph to it's correct start | |
for i, data in enumerate(predicted_data): | |
padding = [None for p in range(i * prediction_len)] | |
ax.plot(padding + data.tolist(), label='Prediction', color="blue") | |
#plt.legend() | |
plt.show() | |
print('loading... ') | |
epochs = 500 | |
seq_len = 50 | |
base, X_train, y_train, X_test, y_test = load_data('sinwave.csv', seq_len, True) | |
model = build_model((seq_len, 1)) | |
print('trainning...') | |
history = model.fit( | |
X_train, | |
y_train, | |
batch_size=64, | |
nb_epoch=epochs, | |
validation_split=0.05, | |
verbose=False) | |
plt.plot(history.history['loss']) | |
plt.show() | |
print("last error", history.history['loss'][-1]) | |
predict_len=7 | |
predictions = predict_sequences_multiple(model, X_test, predict_len) | |
predictions = np.array(predictions) | |
plot_results_multiple(base * predictions, base * y_test, predict_len) | |
predicted = predict_sequence_full(model, X_test) | |
plot_results(base * np.array(predicted), base * y_test) | |
predicted = predict_point_by_point(model, X_test) | |
plot_results(base * np.array(predicted), base * y_test) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment