Skip to content

Instantly share code, notes, and snippets.

@Dref360
Created January 17, 2019 07:04
Show Gist options
  • Select an option

  • Save Dref360/c2898d3e09d5286a6970203acf8a2b5f to your computer and use it in GitHub Desktop.

Select an option

Save Dref360/c2898d3e09d5286a6970203acf8a2b5f to your computer and use it in GitHub Desktop.
import gc
import os
import shutil
import tracemalloc
from pathlib import Path
import numpy as np
from keras import backend as K
from keras import callbacks
from keras.layers import BatchNormalization
from keras.layers import Input, Dense, Dropout, Layer
from keras.models import Model
from keras.utils import np_utils
from keras.utils.generic_utils import to_list
from keras.utils.generic_utils import unpack_singleton
from keras.utils.test_utils import get_test_data
input_dim = 2
num_hidden = 4
num_classes = 2
batch_size = 5
train_samples = 20
test_samples = 20
def data_generator(x, y, batch_size):
x = to_list(x)
y = to_list(y)
max_batch_index = len(x[0]) // batch_size
i = 0
while 1:
x_batch = [array[i * batch_size: (i + 1) * batch_size] for array in x]
x_batch = unpack_singleton(x_batch)
y_batch = [array[i * batch_size: (i + 1) * batch_size] for array in y]
y_batch = unpack_singleton(y_batch)
yield x_batch, y_batch
i += 1
i = i % max_batch_index
# Changing the default arguments of get_test_data.
def get_data_callbacks(num_train=train_samples,
num_test=test_samples,
input_shape=(input_dim,),
classification=True,
num_classes=num_classes):
return get_test_data(num_train=num_train,
num_test=num_test,
input_shape=input_shape,
classification=classification,
num_classes=num_classes)
def investigate_TensorBoard(tmpdir, update_freq):
np.random.seed(np.random.randint(1, 1e7))
filepath = str(tmpdir / 'logs')
(X_train, y_train), (X_test, y_test) = get_data_callbacks()
y_test = np_utils.to_categorical(y_test)
y_train = np_utils.to_categorical(y_train)
class DummyStatefulMetric(Layer):
def __init__(self, name='dummy_stateful_metric', **kwargs):
super(DummyStatefulMetric, self).__init__(name=name, **kwargs)
self.stateful = True
self.state = K.variable(value=0, dtype='int32')
def reset_states(self):
pass
def __call__(self, y_true, y_pred):
return self.state
inp = Input((input_dim,))
hidden = Dense(num_hidden, activation='relu')(inp)
hidden = Dropout(0.1)(hidden)
hidden = BatchNormalization()(hidden)
output = Dense(num_classes, activation='softmax')(hidden)
model = Model(inputs=inp, outputs=output)
model.compile(loss='categorical_crossentropy',
optimizer='sgd',
metrics=['accuracy', DummyStatefulMetric()])
# we must generate new callbacks for each test, as they aren't stateless
def callbacks_factory(histogram_freq, embeddings_freq=1):
return [callbacks.TensorBoard(log_dir=filepath,
histogram_freq=histogram_freq,
write_images=True, write_grads=True,
embeddings_freq=embeddings_freq,
embeddings_layer_names=['dense_1'],
embeddings_data=X_test,
batch_size=5,
update_freq=update_freq)]
# fit without validation data
model.fit(X_train, y_train, batch_size=batch_size,
callbacks=callbacks_factory(histogram_freq=0, embeddings_freq=0),
epochs=3)
# fit with validation data and accuracy
model.fit(X_train, y_train, batch_size=batch_size,
validation_data=(X_test, y_test),
callbacks=callbacks_factory(histogram_freq=0), epochs=2)
# fit generator without validation data
train_generator = data_generator(X_train, y_train, batch_size)
model.fit_generator(train_generator, len(X_train), epochs=2,
callbacks=callbacks_factory(histogram_freq=0,
embeddings_freq=0))
# fit generator with validation data and accuracy
train_generator = data_generator(X_train, y_train, batch_size)
model.fit_generator(train_generator, len(X_train), epochs=2,
validation_data=(X_test, y_test),
callbacks=callbacks_factory(histogram_freq=1))
train_generator.close()
assert os.path.isdir(filepath)
shutil.rmtree(filepath)
class Testing:
def __init__(self):
self.snapshots = []
def collect_stats(self):
self.snapshots.append(tracemalloc.take_snapshot())
if len(self.snapshots) > 1:
stats = self.snapshots[-1].filter_traces(filters).compare_to(self.snapshots[-2], 'filename')
for stat in stats[:10]:
print("{} new KiB {} total KiB {} new {} total memory blocks: ".format(stat.size_diff / 1024,
stat.size / 1024,
stat.count_diff, stat.count))
for line in stat.traceback.format():
print(line)
tmpdir = Path('temp')
if not os.path.exists('temp'):
os.mkdir('temp')
# Keep 10 frames
tracemalloc.start(10)
# We are looking for everything at first
filters = []
t = Testing()
for _ in range(10):
K.clear_session()
investigate_TensorBoard(tmpdir, 'batch')
gc.collect()
K.clear_session()
t.collect_stats()
# Filter for tensorflow
filters = [tracemalloc.Filter(inclusive=True, filename_pattern="*tensorflow*")]
snapshot = t.snapshots[-1]
old_snapshot = t.snapshots[-2]
stats = snapshot.filter_traces(filters).compare_to(old_snapshot.filter_traces(filters), 'traceback')
top_k = sorted([i for i in stats if i.size_diff > 0], key=lambda j: j.size_diff)[::-1][:10]
for k in top_k:
print('Leaked', k.size_diff, 'KB')
for f in k.traceback:
print(f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment