Created
January 17, 2019 07:04
-
-
Save Dref360/c2898d3e09d5286a6970203acf8a2b5f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import gc | |
| import os | |
| import shutil | |
| import tracemalloc | |
| from pathlib import Path | |
| import numpy as np | |
| from keras import backend as K | |
| from keras import callbacks | |
| from keras.layers import BatchNormalization | |
| from keras.layers import Input, Dense, Dropout, Layer | |
| from keras.models import Model | |
| from keras.utils import np_utils | |
| from keras.utils.generic_utils import to_list | |
| from keras.utils.generic_utils import unpack_singleton | |
| from keras.utils.test_utils import get_test_data | |
| input_dim = 2 | |
| num_hidden = 4 | |
| num_classes = 2 | |
| batch_size = 5 | |
| train_samples = 20 | |
| test_samples = 20 | |
| def data_generator(x, y, batch_size): | |
| x = to_list(x) | |
| y = to_list(y) | |
| max_batch_index = len(x[0]) // batch_size | |
| i = 0 | |
| while 1: | |
| x_batch = [array[i * batch_size: (i + 1) * batch_size] for array in x] | |
| x_batch = unpack_singleton(x_batch) | |
| y_batch = [array[i * batch_size: (i + 1) * batch_size] for array in y] | |
| y_batch = unpack_singleton(y_batch) | |
| yield x_batch, y_batch | |
| i += 1 | |
| i = i % max_batch_index | |
| # Changing the default arguments of get_test_data. | |
| def get_data_callbacks(num_train=train_samples, | |
| num_test=test_samples, | |
| input_shape=(input_dim,), | |
| classification=True, | |
| num_classes=num_classes): | |
| return get_test_data(num_train=num_train, | |
| num_test=num_test, | |
| input_shape=input_shape, | |
| classification=classification, | |
| num_classes=num_classes) | |
| def investigate_TensorBoard(tmpdir, update_freq): | |
| np.random.seed(np.random.randint(1, 1e7)) | |
| filepath = str(tmpdir / 'logs') | |
| (X_train, y_train), (X_test, y_test) = get_data_callbacks() | |
| y_test = np_utils.to_categorical(y_test) | |
| y_train = np_utils.to_categorical(y_train) | |
| class DummyStatefulMetric(Layer): | |
| def __init__(self, name='dummy_stateful_metric', **kwargs): | |
| super(DummyStatefulMetric, self).__init__(name=name, **kwargs) | |
| self.stateful = True | |
| self.state = K.variable(value=0, dtype='int32') | |
| def reset_states(self): | |
| pass | |
| def __call__(self, y_true, y_pred): | |
| return self.state | |
| inp = Input((input_dim,)) | |
| hidden = Dense(num_hidden, activation='relu')(inp) | |
| hidden = Dropout(0.1)(hidden) | |
| hidden = BatchNormalization()(hidden) | |
| output = Dense(num_classes, activation='softmax')(hidden) | |
| model = Model(inputs=inp, outputs=output) | |
| model.compile(loss='categorical_crossentropy', | |
| optimizer='sgd', | |
| metrics=['accuracy', DummyStatefulMetric()]) | |
| # we must generate new callbacks for each test, as they aren't stateless | |
| def callbacks_factory(histogram_freq, embeddings_freq=1): | |
| return [callbacks.TensorBoard(log_dir=filepath, | |
| histogram_freq=histogram_freq, | |
| write_images=True, write_grads=True, | |
| embeddings_freq=embeddings_freq, | |
| embeddings_layer_names=['dense_1'], | |
| embeddings_data=X_test, | |
| batch_size=5, | |
| update_freq=update_freq)] | |
| # fit without validation data | |
| model.fit(X_train, y_train, batch_size=batch_size, | |
| callbacks=callbacks_factory(histogram_freq=0, embeddings_freq=0), | |
| epochs=3) | |
| # fit with validation data and accuracy | |
| model.fit(X_train, y_train, batch_size=batch_size, | |
| validation_data=(X_test, y_test), | |
| callbacks=callbacks_factory(histogram_freq=0), epochs=2) | |
| # fit generator without validation data | |
| train_generator = data_generator(X_train, y_train, batch_size) | |
| model.fit_generator(train_generator, len(X_train), epochs=2, | |
| callbacks=callbacks_factory(histogram_freq=0, | |
| embeddings_freq=0)) | |
| # fit generator with validation data and accuracy | |
| train_generator = data_generator(X_train, y_train, batch_size) | |
| model.fit_generator(train_generator, len(X_train), epochs=2, | |
| validation_data=(X_test, y_test), | |
| callbacks=callbacks_factory(histogram_freq=1)) | |
| train_generator.close() | |
| assert os.path.isdir(filepath) | |
| shutil.rmtree(filepath) | |
| class Testing: | |
| def __init__(self): | |
| self.snapshots = [] | |
| def collect_stats(self): | |
| self.snapshots.append(tracemalloc.take_snapshot()) | |
| if len(self.snapshots) > 1: | |
| stats = self.snapshots[-1].filter_traces(filters).compare_to(self.snapshots[-2], 'filename') | |
| for stat in stats[:10]: | |
| print("{} new KiB {} total KiB {} new {} total memory blocks: ".format(stat.size_diff / 1024, | |
| stat.size / 1024, | |
| stat.count_diff, stat.count)) | |
| for line in stat.traceback.format(): | |
| print(line) | |
| tmpdir = Path('temp') | |
| if not os.path.exists('temp'): | |
| os.mkdir('temp') | |
| # Keep 10 frames | |
| tracemalloc.start(10) | |
| # We are looking for everything at first | |
| filters = [] | |
| t = Testing() | |
| for _ in range(10): | |
| K.clear_session() | |
| investigate_TensorBoard(tmpdir, 'batch') | |
| gc.collect() | |
| K.clear_session() | |
| t.collect_stats() | |
| # Filter for tensorflow | |
| filters = [tracemalloc.Filter(inclusive=True, filename_pattern="*tensorflow*")] | |
| snapshot = t.snapshots[-1] | |
| old_snapshot = t.snapshots[-2] | |
| stats = snapshot.filter_traces(filters).compare_to(old_snapshot.filter_traces(filters), 'traceback') | |
| top_k = sorted([i for i in stats if i.size_diff > 0], key=lambda j: j.size_diff)[::-1][:10] | |
| for k in top_k: | |
| print('Leaked', k.size_diff, 'KB') | |
| for f in k.traceback: | |
| print(f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment