Skip to content

Instantly share code, notes, and snippets.

@QuantumNinja92
Forked from jkleint/timeseries_cnn.py
Created November 1, 2018 07:33
Show Gist options
  • Select an option

  • Save QuantumNinja92/011d4bb1a09a7881c7aa0b8955078ddc to your computer and use it in GitHub Desktop.

Select an option

Save QuantumNinja92/011d4bb1a09a7881c7aa0b8955078ddc to your computer and use it in GitHub Desktop.
Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction.
#!/usr/bin/env python
"""
Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction.
"""
from __future__ import print_function, division
import numpy as np
from keras.layers import Conv1D, Dense, MaxPooling1D, Flatten
from keras.models import Sequential
__date__ = '2016-07-22'
def make_timeseries_regressor(window_size, filter_length, nb_input_series=1, nb_outputs=1, nb_filter=4):
""":Return: a Keras Model for predicting the next value in a timeseries given a fixed-size lookback window of previous values.
The model can handle multiple input timeseries (`nb_input_series`) and multiple prediction targets (`nb_outputs`).
:param int window_size: The number of previous timeseries values to use as input features. Also called lag or lookback.
:param int nb_input_series: The number of input timeseries; 1 for a single timeseries.
The `X` input to ``fit()`` should be an array of shape ``(n_instances, window_size, nb_input_series)``; each instance is
a 2D array of shape ``(window_size, nb_input_series)``. For example, for `window_size` = 3 and `nb_input_series` = 1 (a
single timeseries), one instance could be ``[[0], [1], [2]]``. See ``make_timeseries_instances()``.
:param int nb_outputs: The output dimension, often equal to the number of inputs.
For each input instance (array with shape ``(window_size, nb_input_series)``), the output is a vector of size `nb_outputs`,
usually the value(s) predicted to come after the last value in that input instance, i.e., the next value
in the sequence. The `y` input to ``fit()`` should be an array of shape ``(n_instances, nb_outputs)``.
:param int filter_length: the size (along the `window_size` dimension) of the sliding window that gets convolved with
each position along each instance. The difference between 1D and 2D convolution is that a 1D filter's "height" is fixed
to the number of input timeseries (its "width" being `filter_length`), and it can only slide along the window
dimension. This is useful as generally the input timeseries have no spatial/ordinal relationship, so it's not
meaningful to look for patterns that are invariant with respect to subsets of the timeseries.
:param int nb_filter: The number of different filters to learn (roughly, input patterns to recognize).
"""
model = Sequential((
# The first conv layer learns `nb_filter` filters (aka kernels), each of size ``(filter_length, nb_input_series)``.
# Its output will have shape (None, window_size - filter_length + 1, nb_filter), i.e., for each position in
# the input timeseries, the activation of each filter at that position.
Conv1D(filters=nb_filter, kernel_size=filter_length, activation='relu', input_shape=(window_size, nb_input_series)),
MaxPooling1D(), # Downsample the output of convolution by 2X.
Conv1D(filters=nb_filter, kernel_size=filter_length, activation='relu'),
MaxPooling1D(),
Flatten(),
Dense(nb_outputs, activation='linear'), # For binary classification, change the activation to 'sigmoid'
))
model.compile(loss='mse', optimizer='adam', metrics=['mae'])
# To perform (binary) classification instead:
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['binary_accuracy'])
return model
def make_timeseries_instances(timeseries, window_size):
"""Make input features and prediction targets from a `timeseries` for use in machine learning.
:return: A tuple of `(X, y, q)`. `X` are the inputs to a predictor, a 3D ndarray with shape
``(timeseries.shape[0] - window_size, window_size, timeseries.shape[1] or 1)``. For each row of `X`, the
corresponding row of `y` is the next value in the timeseries. The `q` or query is the last instance, what you would use
to predict a hypothetical next (unprovided) value in the `timeseries`.
:param ndarray timeseries: Either a simple vector, or a matrix of shape ``(timestep, series_num)``, i.e., time is axis 0 (the
row) and the series is axis 1 (the column).
:param int window_size: The number of samples to use as input prediction features (also called the lag or lookback).
"""
timeseries = np.asarray(timeseries)
assert 0 < window_size < timeseries.shape[0]
X = np.atleast_3d(np.array([timeseries[start:start + window_size] for start in range(0, timeseries.shape[0] - window_size)]))
y = timeseries[window_size:]
q = np.atleast_3d([timeseries[-window_size:]])
return X, y, q
def evaluate_timeseries(timeseries, window_size):
"""Create a 1D CNN regressor to predict the next value in a `timeseries` using the preceding `window_size` elements
as input features and evaluate its performance.
:param ndarray timeseries: Timeseries data with time increasing down the rows (the leading dimension/axis).
:param int window_size: The number of previous timeseries values to use to predict the next.
"""
filter_length = 5
nb_filter = 4
timeseries = np.atleast_2d(timeseries)
if timeseries.shape[0] == 1:
timeseries = timeseries.T # Convert 1D vectors to 2D column vectors
nb_samples, nb_series = timeseries.shape
print('\n\nTimeseries ({} samples by {} series):\n'.format(nb_samples, nb_series), timeseries)
model = make_timeseries_regressor(window_size=window_size, filter_length=filter_length, nb_input_series=nb_series, nb_outputs=nb_series, nb_filter=nb_filter)
print('\n\nModel with input size {}, output size {}, {} conv filters of length {}'.format(model.input_shape, model.output_shape, nb_filter, filter_length))
model.summary()
X, y, q = make_timeseries_instances(timeseries, window_size)
print('\n\nInput features:', X, '\n\nOutput labels:', y, '\n\nQuery vector:', q, sep='\n')
test_size = int(0.01 * nb_samples) # In real life you'd want to use 0.2 - 0.5
X_train, X_test, y_train, y_test = X[:-test_size], X[-test_size:], y[:-test_size], y[-test_size:]
model.fit(X_train, y_train, epochs=25, batch_size=2, validation_data=(X_test, y_test))
pred = model.predict(X_test)
print('\n\nactual', 'predicted', sep='\t')
for actual, predicted in zip(y_test, pred.squeeze()):
print(actual.squeeze(), predicted, sep='\t')
print('next', model.predict(q).squeeze(), sep='\t')
def main():
"""Prepare input data, build model, evaluate."""
np.set_printoptions(threshold=25)
ts_length = 1000
window_size = 50
print('\nSimple single timeseries vector prediction')
timeseries = np.arange(ts_length) # The timeseries f(t) = t
evaluate_timeseries(timeseries, window_size)
print('\nMultiple-input, multiple-output prediction')
timeseries = np.array([np.arange(ts_length), -np.arange(ts_length)]).T # The timeseries f(t) = [t, -t]
evaluate_timeseries(timeseries, window_size)
if __name__ == '__main__':
main()
@QuantumNinja92
Copy link
Author

Thanks to jkleint for the amazing code, just edited to Keras 2.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment