Last active
October 25, 2017 16:46
-
-
Save wf34/fb60fc889f3eae9a674fb4e6d68c7389 to your computer and use it in GitHub Desktop.
Delayed sin echo prediction with Tensorflow and Keras (Detailed discussion at: https://stackoverflow.com/questions/46937898/delayed-echo-of-sin-cannot-reproduce-tensorflow-result-in-keras)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import sys | |
import gc | |
from typing import Optional, Tuple | |
import numpy as np | |
from keras.models import Sequential | |
from keras.layers import Dense, TimeDistributed | |
from keras.layers import LSTM | |
import keras.optimizers | |
from keras.backend.tensorflow_backend import set_session | |
import keras.backend as K | |
import tensorflow as tf | |
from tensorflow.contrib import rnn | |
import matplotlib.pyplot as plt | |
def generate_sample(f: Optional[float] = 1.0, | |
t0: Optional[float] = None, batch_size: int = 1, | |
samples: int = 200, | |
delay : int = 20) \ | |
-> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: | |
""" | |
Generates data samples. | |
:param f: The frequency to use for all time series or None to randomize. | |
:param t0: The time offset to use for all time series or None to randomize. | |
:param batch_size: The number of time series to generate. | |
:param predict: The number of future samples to generate. | |
:param samples: The number of past (and current) samples to generate. | |
:return: Tuple that contains the past times and values as well as the future times and values. In all outputs, | |
each row represents one time series of the batch. | |
""" | |
Fs = 100 | |
T = np.empty((batch_size, samples)) | |
Y = np.empty((batch_size, samples)) | |
FT = np.empty((batch_size, samples)) | |
FY = np.empty((batch_size, samples)) | |
_t0 = t0 | |
for i in range(batch_size): | |
t = np.arange(0, samples + delay) / Fs | |
if _t0 is None: | |
t0 = np.random.rand() * 2 * np.pi | |
else: | |
t0 = _t0 + i/float(batch_size) | |
freq = f | |
if freq is None: | |
freq = np.random.rand() * 3.5 + 0.5 | |
y = np.sin(2 * np.pi * freq * (t + t0)) | |
T[i, :] = t[delay:] | |
Y[i, :] = y[delay:] | |
FT[i, :] = t[:samples] | |
FY[i, :] = y[:samples] | |
#print(Y, FY) | |
return T, Y, FT, FY | |
learning_rate = 0.001 | |
training_iters = 650000 | |
batch_size = 50 | |
display_step = 100 | |
log_step = 10 | |
# Network Parameters | |
n_input = 1 # input is sin(x), a scalar | |
n_steps = 100 # timesteps | |
n_hidden = 32 # hidden layer num of features | |
SEED = 34 | |
model_choices_ = ['keras', 'tensorflow'] | |
def custom_loss(y_true, y_pred): | |
y_true = K.squeeze(y_true, axis = 2) | |
y_pred = K.squeeze(y_pred, axis = 2) | |
sq_diff = K.square(y_true - y_pred) | |
return K.mean(K.sum(sq_diff, axis = 1), axis = 0) | |
def do_model(lib): | |
assert lib in model_choices_ | |
if lib == 'keras': | |
model = Sequential() | |
model.add(LSTM(n_hidden, | |
input_shape=(n_steps, n_input), | |
return_sequences=True)) | |
model.add(TimeDistributed(Dense(n_input, activation='linear'))) | |
model.compile(loss=custom_loss, | |
optimizer=keras.optimizers.Adam(lr=learning_rate), | |
metrics=[]) | |
return model | |
elif lib == 'tensorflow': | |
x = tf.placeholder(tf.float32, [None, n_steps, n_input]) | |
y = tf.placeholder(tf.float32, [None, n_steps]) | |
weights = { | |
'out': tf.Variable(tf.random_normal([n_hidden, n_steps], seed = SEED)) | |
} | |
biases = { | |
'out': tf.Variable(tf.random_normal([n_steps], seed = SEED)) | |
} | |
lstm = rnn.LSTMCell(n_hidden, forget_bias=1.0) | |
outputs, states = tf.nn.dynamic_rnn(lstm, inputs=x, | |
dtype=tf.float32, | |
time_major=False) | |
h = tf.transpose(outputs, [1, 0, 2]) | |
pred = tf.nn.bias_add(tf.matmul(h[-1], weights['out']), biases['out']) | |
individual_losses = tf.reduce_sum(tf.squared_difference(pred, y), | |
reduction_indices=1) | |
loss = tf.reduce_mean(individual_losses) | |
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) \ | |
.minimize(loss) | |
return {'optimizer' : optimizer, | |
'pred' : pred, | |
'loss' : loss, | |
'x' : x, | |
'y' : y} | |
def do_one_iter(session, model, lib, batch_x, batch_y): | |
assert lib in model_choices_ | |
if lib == 'keras': | |
batch_x = batch_x.reshape((batch_size, n_steps, n_input)) | |
batch_y = batch_y.reshape((batch_size, n_steps, n_input)) | |
return model.train_on_batch(batch_x, batch_y) | |
elif lib == 'tensorflow': | |
batch_x = batch_x.reshape((batch_size, n_steps, n_input)) | |
batch_y = batch_y.reshape((batch_size, n_steps)) | |
x = model['x'] | |
y = model['y'] | |
optimizer = model['optimizer'] | |
loss = model['loss'] | |
session.run(optimizer, feed_dict={x: batch_x, y: batch_y}) | |
return session.run(loss, feed_dict={x: batch_x, y: batch_y}) | |
def do_prediction(session, model, lib, test_input): | |
assert lib in model_choices_ | |
if lib == 'keras': | |
return model.predict_on_batch(test_input) | |
elif lib == 'tensorflow': | |
x = model['x'] | |
pred = model['pred'] | |
return session.run(pred, feed_dict={x: test_input}) | |
def main(lib): | |
np.random.seed(SEED) | |
tf.reset_default_graph() | |
graph_level_seed = SEED | |
tf.set_random_seed(graph_level_seed) | |
model = do_model(lib) | |
config = tf.ConfigProto() | |
config.gpu_options.per_process_gpu_memory_fraction = 0.1 | |
init = tf.global_variables_initializer() | |
with tf.Session(config=config) as sess: | |
sess.run(init) | |
step = 1 | |
loss_values = [np.inf] | |
loss_steps = [0.] | |
target_loss = 0.15 | |
# Keep training until we reach max iterations | |
while step * batch_size < training_iters and loss_values[-1] > target_loss: | |
_, batch_x, __, batch_y = generate_sample(f=None, t0=None, | |
batch_size=batch_size, | |
samples=n_steps) | |
loss_value = do_one_iter(sess, model, lib, batch_x, batch_y) | |
if step % log_step == 0: | |
loss_values.append(loss_value) | |
loss_steps.append(step) | |
if step % display_step == 0: | |
print("Iter " + str(step) + \ | |
", Minibatch Loss= {:.6f}".format(loss_value)) | |
step += 1 | |
print("Optimization Finished!") | |
plt.plot(loss_steps[1:], loss_values[1:]) | |
plt.yscale('log') | |
plt.show() | |
# Test the prediction | |
n_tests = 3 | |
for i in range(1, n_tests+1): | |
plt.subplot(n_tests, 1, i) | |
t, y, delayed_t, delayed_y = \ | |
generate_sample(f=i, t0=None, samples=n_steps) | |
test_input = y.reshape((1, n_steps, n_input)) | |
prediction = do_prediction(sess, model, lib, test_input) | |
t = t.ravel() | |
y = y.ravel() | |
delayed_t = delayed_t.ravel() | |
delayed_y = delayed_y.ravel() | |
prediction = prediction.ravel() | |
plt.plot(t, y, color='black') | |
plt.plot(delayed_t, delayed_y, color='green', linestyle=':', linewidth = 3) | |
plt.plot(delayed_t, prediction, color='red') | |
plt.ylim([-1., 1.]) | |
plt.xlabel('time [t]') | |
plt.ylabel('signal') | |
plt.show() | |
gc.collect() | |
if __name__ == '__main__': | |
if sys.version_info[0] < 3: | |
raise ValueError('tested on python3 only') | |
else: | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-l', '--lib', | |
choices = model_choices_, | |
required = True) | |
main(**vars(parser.parse_args())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment