Skip to content

Instantly share code, notes, and snippets.

@bnaul
Last active May 5, 2017 04:52
Show Gist options
  • Save bnaul/621d91af46ce164d9c43ce4b7e279810 to your computer and use it in GitHub Desktop.
Save bnaul/621d91af46ce164d9c43ce4b7e279810 to your computer and use it in GitHub Desktop.
import tensorflow as tf
from tensorflow.contrib import keras
from tensorflow.examples.tutorials.mnist import input_data
from dask.multiprocessing import get as mp_get
from distributed import Client
from functools import partial
from sklearn.model_selection import ParameterGrid
def get_mnist_data():
# input image dimensions
img_rows, img_cols = 28, 28
num_classes = 10
# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
# x_train = x_train[:500]; y_train = y_train[:500]
return x_train, y_train, x_test, y_test
def conv_model(input_shape, num_classes, nb_filter=32, drop_frac=0.0,
kernel_size=3, activation='relu', pool=True):
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(nb_filter, kernel_size=kernel_size, activation=activation,
input_shape=input_shape))
model.add(keras.layers.Conv2D(nb_filter, kernel_size=kernel_size, activation=activation))
if pool:
model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(keras.layers.Dropout(drop_frac))
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.Dropout(drop_frac))
model.add(keras.layers.Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adadelta(),
metrics=['accuracy'])
return model
def tf_model(lr=0.5):
g = tf.Graph()
with g.as_default():
print(g)
x = tf.placeholder(tf.float32, [None, 784])
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.matmul(x, W) + b
y_ = tf.placeholder(tf.float32, [None, 10])
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y))
train_step = tf.train.GradientDescentOptimizer(lr).minimize(cross_entropy)
return {'x': x, 'y': y, 'y_': y_, 'train_step': train_step, 'graph': g}
def fit_tf(model):
x, y, y_, train_step = (model['x'], model['y'], model['y_'],
model['train_step'])
mnist = input_data.read_data_sets('/tmp/tensorflow/mnist/input_data', one_hot=True)
with tf.Session(graph=model['graph']) as sess:
tf.global_variables_initializer().run(session=sess)
for _ in range(1000):
batch_xs, batch_ys = mnist.train.next_batch(100)
sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
score = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y))
return sess.run((score, accuracy), feed_dict={x: mnist.test.images,
y_: mnist.test.labels})
def train_sync(param_grid):
x_train, y_train, x_test, y_test = get_mnist_data()
models = [conv_model(input_shape=x_train.shape[1:],
num_classes=y_train.shape[1], **params)
for params in param_grid]
histories = [m.fit(x_train, y_train, batch_size=128, epochs=10, verbose=0)
for m in models]
scores = [m.evaluate(x_test, y_test, verbose=0) for m in models]
return scores
def naive_distributed(param_grid):
x_train, y_train, x_test, y_test = get_mnist_data()
model_futures = [client.submit(conv_model, input_shape=x_train.shape[1:],
num_classes=y_train.shape[1], **params)
for params in param_grid]
model_fits = [client.submit(keras.models.Sequential.fit, m, x_train,
y_train, batch_size=128, epochs=10, verbose=0)
for m in model_futures]
score_futures = [client.submit(keras.models.Sequential.evaluate, m, x_test, y_test,
verbose=0) for m in model_fits]
scores = client.gather(score_futures)
return scores
def naive_distributed_tf(param_grid):
model_futures = [client.submit(tf_model, **params) for params in param_grid]
score_futures = [client.submit(fit_tf, m) for m in model_futures]
scores = client.gather(score_futures)
return scores
def single_function_distributed(param_grid):
def train_and_score_model(input_shape, num_classes, x_train, y_train,
x_test, y_test, **params):
g = tf.Graph()
gpu_opts = tf.ConfigProto(gpu_options=tf.GPUOptions(
per_process_gpu_memory_fraction=0.2))
with tf.Session(graph=g, config=gpu_opts) as sess:
keras.backend.set_session(sess)
model = conv_model(input_shape, num_classes, **params)
model.fit(x_train, y_train, batch_size=128, epochs=10, verbose=0)
return model.evaluate(x_test, y_test, verbose=0)
x_train, y_train, x_test, y_test = get_mnist_data()
score_futures = [client.submit(train_and_score_model, x_train.shape[1:],
y_train.shape[1], x_train, y_train, x_test,
y_test, **params) for params in param_grid]
# return score_futures
scores = client.gather(score_futures)
return scores
# Works as long as worker uses threads, no nanny
def train_in_subprocess(param_grid):
# Not needed for this example but was for my real problem
def ignore_result(func):
def wrapper(*args, **kwargs):
func(*args, **kwargs)
return None
return wrapper
def run_in_process(func, *args, **kwargs):
from dask.multiprocessing import get as mp_get
dsk = {'x': mp_get(partial(func, **kwargs), *args)}
return (dsk, 'x')
def train_and_score_model(input_shape, num_classes, x_train, y_train,
x_test, y_test, **params):
# gpu_opts = tf.ConfigProto(gpu_options=tf.GPUOptions(
# per_process_gpu_memory_fraction=0.2))
# K.set_session(tf.Session(config=gpu_opts))
model = conv_model(input_shape, num_classes, **params)
model.fit(x_train, y_train, batch_size=128, epochs=10, verbose=0)
return model.evaluate(x_test, y_test, verbose=0)
x_train, y_train, x_test, y_test = get_mnist_data()
score_futures = [client.submit(run_in_process, train_and_score_model, x_train.shape[1:],
y_train.shape[1], x_train, y_train, x_test,
y_test, **params) for params in param_grid]
scores = client.gather(score_futures)
return scores
if __name__ == '__main__':
SCHEDULER_IP = '0.0.0.0:8786'
# SCHEDULER_IP = None
client = Client(SCHEDULER_IP)
param_grid = ParameterGrid({'drop_frac': [0.0, 0.25],
'nb_filter': [32, 64],
'kernel_size': [2, 3, 5]})
# scores = train_sync(param_grid)
# scores = naive_distributed(param_grid)
scores = single_function_distributed(param_grid)
# scores = train_in_subprocess(param_grid)
# param_grid = ParameterGrid({'lr': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]})
# scores = naive_distributed_tf(param_grid)
for (score, accuracy), params in zip(scores, param_grid):
print(params, f'Accuracy: {accuracy}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment