-
-
Save fanwei918/4266991e57d875e64abf5689a1f1849c to your computer and use it in GitHub Desktop.
Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction. | |
""" | |
from __future__ import print_function, division | |
import numpy as np | |
from keras.layers import Convolution1D, Dense, MaxPooling1D, Flatten | |
from keras.models import Sequential | |
__date__ = '2016-07-22' | |
def make_timeseries_regressor(window_size, filter_length, nb_input_series=1, nb_outputs=1, nb_filter=4): | |
""":Return: a Keras Model for predicting the next value in a timeseries given a fixed-size lookback window of previous values. | |
The model can handle multiple input timeseries (`nb_input_series`) and multiple prediction targets (`nb_outputs`). | |
:param int window_size: The number of previous timeseries values to use as input features. Also called lag or lookback. | |
:param int nb_input_series: The number of input timeseries; 1 for a single timeseries. | |
The `X` input to ``fit()`` should be an array of shape ``(n_instances, window_size, nb_input_series)``; each instance is | |
a 2D array of shape ``(window_size, nb_input_series)``. For example, for `window_size` = 3 and `nb_input_series` = 1 (a | |
single timeseries), one instance could be ``[[0], [1], [2]]``. See ``make_timeseries_instances()``. | |
:param int nb_outputs: The output dimension, often equal to the number of inputs. | |
For each input instance (array with shape ``(window_size, nb_input_series)``), the output is a vector of size `nb_outputs`, | |
usually the value(s) predicted to come after the last value in that input instance, i.e., the next value | |
in the sequence. The `y` input to ``fit()`` should be an array of shape ``(n_instances, nb_outputs)``. | |
:param int filter_length: the size (along the `window_size` dimension) of the sliding window that gets convolved with | |
each position along each instance. The difference between 1D and 2D convolution is that a 1D filter's "height" is fixed | |
to the number of input timeseries (its "width" being `filter_length`), and it can only slide along the window | |
dimension. This is useful as generally the input timeseries have no spatial/ordinal relationship, so it's not | |
meaningful to look for patterns that are invariant with respect to subsets of the timeseries. | |
:param int nb_filter: The number of different filters to learn (roughly, input patterns to recognize). | |
""" | |
model = Sequential(( | |
# The first conv layer learns `nb_filter` filters (aka kernels), each of size ``(filter_length, nb_input_series)``. | |
# Its output will have shape (None, window_size - filter_length + 1, nb_filter), i.e., for each position in | |
# the input timeseries, the activation of each filter at that position. | |
Convolution1D(nb_filter=nb_filter, filter_length=filter_length, activation='relu', input_shape=(window_size, nb_input_series)), | |
MaxPooling1D(), # Downsample the output of convolution by 2X. | |
Convolution1D(nb_filter=nb_filter, filter_length=filter_length, activation='relu'), | |
MaxPooling1D(), | |
Flatten(), | |
Dense(nb_outputs, activation='linear'), # For binary classification, change the activation to 'sigmoid' | |
)) | |
model.compile(loss='mse', optimizer='adam', metrics=['mae']) | |
# To perform (binary) classification instead: | |
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['binary_accuracy']) | |
return model | |
def make_timeseries_instances(timeseries, window_size): | |
"""Make input features and prediction targets from a `timeseries` for use in machine learning. | |
:return: A tuple of `(X, y, q)`. `X` are the inputs to a predictor, a 3D ndarray with shape | |
``(timeseries.shape[0] - window_size, window_size, timeseries.shape[1] or 1)``. For each row of `X`, the | |
corresponding row of `y` is the next value in the timeseries. The `q` or query is the last instance, what you would use | |
to predict a hypothetical next (unprovided) value in the `timeseries`. | |
:param ndarray timeseries: Either a simple vector, or a matrix of shape ``(timestep, series_num)``, i.e., time is axis 0 (the | |
row) and the series is axis 1 (the column). | |
:param int window_size: The number of samples to use as input prediction features (also called the lag or lookback). | |
""" | |
timeseries = np.asarray(timeseries) | |
assert 0 < window_size < timeseries.shape[0] | |
X = np.atleast_3d(np.array([timeseries[start:start + window_size] for start in range(0, timeseries.shape[0] - window_size)])) | |
y = timeseries[window_size:] | |
q = np.atleast_3d([timeseries[-window_size:]]) | |
return X, y, q | |
def evaluate_timeseries(timeseries, window_size): | |
"""Create a 1D CNN regressor to predict the next value in a `timeseries` using the preceding `window_size` elements | |
as input features and evaluate its performance. | |
:param ndarray timeseries: Timeseries data with time increasing down the rows (the leading dimension/axis). | |
:param int window_size: The number of previous timeseries values to use to predict the next. | |
""" | |
filter_length = 5 | |
nb_filter = 4 | |
timeseries = np.atleast_2d(timeseries) | |
if timeseries.shape[0] == 1: | |
timeseries = timeseries.T # Convert 1D vectors to 2D column vectors | |
nb_samples, nb_series = timeseries.shape | |
print('\n\nTimeseries ({} samples by {} series):\n'.format(nb_samples, nb_series), timeseries) | |
model = make_timeseries_regressor(window_size=window_size, filter_length=filter_length, nb_input_series=nb_series, nb_outputs=nb_series, nb_filter=nb_filter) | |
print('\n\nModel with input size {}, output size {}, {} conv filters of length {}'.format(model.input_shape, model.output_shape, nb_filter, filter_length)) | |
model.summary() | |
X, y, q = make_timeseries_instances(timeseries, window_size) | |
print('\n\nInput features:', X, '\n\nOutput labels:', y, '\n\nQuery vector:', q, sep='\n') | |
test_size = int(0.01 * nb_samples) # In real life you'd want to use 0.2 - 0.5 | |
X_train, X_test, y_train, y_test = X[:-test_size], X[-test_size:], y[:-test_size], y[-test_size:] | |
model.fit(X_train, y_train, nb_epoch=25, batch_size=2, validation_data=(X_test, y_test)) | |
pred = model.predict(X_test) | |
print('\n\nactual', 'predicted', sep='\t') | |
for actual, predicted in zip(y_test, pred.squeeze()): | |
print(actual.squeeze(), predicted, sep='\t') | |
print('next', model.predict(q).squeeze(), sep='\t') | |
def main(): | |
"""Prepare input data, build model, evaluate.""" | |
np.set_printoptions(threshold=25) | |
ts_length = 1000 | |
window_size = 50 | |
print('\nSimple single timeseries vector prediction') | |
timeseries = np.arange(ts_length) # The timeseries f(t) = t | |
evaluate_timeseries(timeseries, window_size) | |
print('\nMultiple-input, multiple-output prediction') | |
timeseries = np.array([np.arange(ts_length), -np.arange(ts_length)]).T # The timeseries f(t) = [t, -t] | |
evaluate_timeseries(timeseries, window_size) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment