Last active
May 12, 2024 16:25
-
-
Save komodovaran/94e760bed757aa999fb1e846c62c515d to your computer and use it in GitHub Desktop.
Training with variable length sequences in Tensorflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from math import ceil | |
from typing import Tuple, Generator | |
import tensorflow as tf | |
from tensorflow.python.data import Dataset | |
from tensorflow.python.keras import Input | |
from tensorflow.python.keras.layers import TimeDistributed, LSTM, Dense | |
from tensorflow.python.keras.models import Model | |
import numpy as np | |
def check_tf_v2_3_gpus() -> None: | |
gpus = tf.config.experimental.list_physical_devices("GPU") | |
if gpus: | |
try: | |
for gpu in gpus: | |
tf.config.experimental.set_memory_growth(gpu, True) | |
logical_gpus = tf.config.experimental.list_logical_devices("GPU") | |
print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs") | |
except RuntimeError as e: | |
raise e | |
def make_variable_length_data() -> Tuple[ | |
Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray] | |
]: | |
# for this setup X and y are equal lengths | |
x1 = np.random.normal(0, 1, 100).reshape((-1, 2)) | |
x2 = np.random.normal(0, 1, 50).reshape((-1, 2)) | |
y1 = np.ones((len(x1), 1)) | |
y2 = np.ones((len(x2), 1)) | |
X_train = [x1, x2] | |
y_train = [y1, y2] | |
X_test = [x1, x2, x1, x2] | |
y_test = [y1, y2, y1, y2] | |
return (X_train, y_train), (X_test, y_test) | |
def data_generator( | |
X: np.ndarray, y: np.ndarray | |
) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: | |
X = iter(X) | |
y = iter(y) | |
while True: | |
try: | |
yield next(X), next(y) | |
except StopIteration: | |
break | |
def calc_steps_per_epoch_if_drop_remainder(size: int, batch_size: int) -> int: | |
return int(ceil(1.0 * size / batch_size)) | |
def get_model() -> Model: | |
# None for variable length | |
i = Input(shape=(None, 2)) | |
x = LSTM(8, return_sequences=True)(i) | |
o = TimeDistributed(Dense(1, activation="sigmoid"))(x) | |
model = Model(i, o) | |
model.compile(loss="binary_crossentropy", optimizer="adam") | |
return model | |
def main() -> None: | |
EPOCHS = 100 | |
BATCH_SIZE = 1 | |
(X_train, y_train), (X_test, y_test) = make_variable_length_data() | |
datasets = [] | |
steps = [] | |
for (X, y) in ((X_train, y_train), (X_test, y_test)): | |
tf_dataset = Dataset.from_generator( | |
lambda: data_generator(X_train, y_train), | |
output_types=(tf.float64, tf.float64), | |
output_shapes=( | |
(None, 2), | |
(None, 1), | |
), | |
).batch(BATCH_SIZE, drop_remainder=True) | |
datasets.append(tf_dataset) | |
steps.append( | |
calc_steps_per_epoch_if_drop_remainder(len(X), batch_size=BATCH_SIZE) | |
) | |
train_set, test_set = datasets | |
train_steps, test_steps = steps | |
model = get_model() | |
model.fit( | |
tf_dataset.repeat(EPOCHS), | |
steps_per_epoch=train_steps, | |
validation_data=test_set, | |
validation_steps=test_steps, | |
epochs=EPOCHS, | |
verbose=1, | |
) | |
if __name__ == "__main__": | |
check_tf_v2_3_gpus() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment