Skip to content

Instantly share code, notes, and snippets.

@bzamecnik
Last active August 9, 2017 13:03
Show Gist options
  • Save bzamecnik/92607207af912ae53dd2aa557631b977 to your computer and use it in GitHub Desktop.
Save bzamecnik/92607207af912ae53dd2aa557631b977 to your computer and use it in GitHub Desktop.
Data-parallel model in Keras - a sketch (@rossumai)
import keras.backend as K
from keras.layers import Lambda
from keras.layers.merge import concatenate
from keras.models import Model
import keras.optimizers
from keras.optimizers import clip_norm, Optimizer
import tensorflow as tf
# this should be fairly ready
class DataParallelOptimizer(Optimizer):
"""
Wrapper class for data-parallel optimization. Multiple model replicas
(towers) with shared weights operate on different batch slices and compute
gradients in parallel on multiple GPUs. Gradients are then averaged on
parameter sever (CPU or one of GPUs) and weights updated.
It accepts a list of losses (living on separate devices) instead of a
single loss, computes gradients (collocated with losses) for each loss,
averages then on the PS device and provides weight update operations.
Usage:
from keras.optimizers import Adam
model.compile(..., optimizer=DataParallelOptimizer(Adam()))
"""
def __init__(self, optimizer):
self.optimizer = keras.optimizers.get(optimizer)
def get_gradients(self, losses, params):
# NOTE: argument "losses" (list) instead of a single "loss"
if isinstance(losses, list):
# Gradients for each tower loss.
# NOTE: K.gradients call tf.gradiens with
# colocate_gradients_with_ops=True, thus each tf.gradient operation
# should be collocated with it's respective loss. We assume losses
# to be located at different devices.
tower_grads = [K.gradients(loss, params) for loss in losses]
# Average gradients.
# This should be a synchronization point (for sync SGD) and this
# operation will be located according to the scope where the main
# Model was defined - should be the parameter server device.
grads = K.mean(K.stack(tower_grads, 0))
else:
grads = K.gradients(losses, params)
if hasattr(self, 'clipnorm') and self.clipnorm > 0:
norm = K.sqrt(sum([K.sum(K.square(g)) for g in grads]))
grads = [clip_norm(g, self.clipnorm, norm) for g in grads]
if hasattr(self, 'clipvalue') and self.clipvalue > 0:
grads = [K.clip(g, -self.clipvalue, self.clipvalue) for g in grads]
return grads
def get_updates(self, params, constraints, loss):
return self.optimizer.get_updates(params, constraints, loss)
@property
def weights(self):
self.optimizer.weights()
def get_config(self):
self.optimizer.get_config()
def from_config(self, config):
self.optimizer.from_config()
# so far just an incomplete sketch...
class DataParallelModel(Model):
def __init__(self, inputs, outputs, basic_model, replicas, name=None):
super(DataParallelModel, self).__init__(inputs, outputs, name)
self.basic_model = basic_model
self.replicas = replicas
@classmethod
def create(cls, basic_model, gpu_count=2):
assert gpu_count >= 2, "At least 2 GPUs"
def get_slice(data, idx, parts):
shape = tf.shape(data)
size = tf.concat([shape[:1] // parts, shape[1:]], axis=0)
stride = tf.concat([shape[:1] // parts, shape[1:] * 0], axis=0)
start = stride * idx
return tf.slice(data, start, size)
outputs_all = []
replicas = []
# place operations for replica on a separate device
for gpu_id in range(gpu_count):
with tf.device("gpu:%d" % gpu_id):
with tf.name_scope('replica_%d' % gpu_id):
slices = []
# Slice each input into a piece for processing on this GPU
for x in basic_model.inputs:
input_shape = tuple(x.get_shape().as_list())[1:]
slice = Lambda(get_slice, output_shape=input_shape,
arguments={'idx': gpu_id, 'parts': gpu_count})(x)
slices.append(slice)
if gpu_id == 0:
for i in range(len(basic_model.outputs)):
outputs_all.append([])
outputs = basic_model(slices)
replica = Model(inputs=basic_model.inputs, outputs=outputs)
replicas.append(replica)
if not isinstance(outputs, list):
outputs = [outputs]
# Save all the outputs for merging back together later
for l in range(len(outputs)):
outputs_all[l].append(outputs[l])
with tf.device("gpu:0"):
merged = []
for outputs in outputs_all:
merged.append(concatenate(outputs, axis=0))
return cls(inputs=basic_model.inputs, outputs=merged,
basic_model=basic_model, replicas=replicas)
def compile(self, optimizer, loss, metrics=None, loss_weights=None,
sample_weight_mode=None, **kwargs):
"""
optimizer - identifier or instance of an optimizer
loss - identifier or instance of a loss function
"""
replica_total_losses = []
# place the loss and gradient operations for replica on a separate device
for gpu_id, replica in enumerate(self.replicas):
with tf.device("gpu:%d" % gpu_id):
with tf.name_scope('replica_%d' % gpu_id):
replica.compile(optimizer, loss, metrics, loss_weights)
replica_total_losses.append(replica.total_loss)
super(DataParallelModel, self).compile(
DataParallelOptimizer(optimizer), loss, metrics, loss_weights)
# separate losses whose gradient can be computed in parallel
self.replica_total_losses = replica_total_losses
# redefine total_loss with the average of replica losses
self.total_loss = K.mean(K.stack(replica_total_losses, 0))
def _make_train_function(self):
if not hasattr(self, 'train_function'):
raise RuntimeError('You must compile your model before using it.')
if self.train_function is None:
inputs = self._feed_inputs + self._feed_targets + self._feed_sample_weights
if self.uses_learning_phase and not isinstance(K.learning_phase(), int):
inputs += [K.learning_phase()]
assert isinstance(self.optimizer, DataParallelOptimizer)
training_updates = self.optimizer.get_updates(
self._collected_trainable_weights,
self.constraints,
self.replica_total_losses)
updates = self.updates + training_updates
# Gets loss and metrics. Updates weights at each call.
self.train_function = K.function(inputs,
[self.total_loss] + self.metrics_tensors,
updates=updates,
name='train_function',
**self._function_kwargs)
# TODO: in ModelCheckpointer save the basic_model
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Model
from keras.layers import Dense, Dropout, Flatten, Conv2D, Input
import os
import tensorflow as tf
from data_parallel import DataParallelModel, DataParallelOptimizer
def load_data():
# input image dimensions
img_rows, img_cols = 28, 28
num_classes = 10
# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1).astype('float32') / 255
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1).astype('float32') / 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
return x_train, y_train, x_test, y_test
def make_basic_model(input_shape, num_classes):
input = Input(shape=input_shape)
x = Conv2D(16, 3, activation='selu')(input)
x = Dropout(0.3)(x)
x = Conv2D(32, 3, activation='selu')(x)
x = Dropout(0.3)(x)
x = Conv2D(64, 3, activation='selu')(x)
x = Dropout(0.3)(x)
x = Flatten()(x)
output = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=input, outputs=output)
model.summary()
return model
def train_single_gpu(batch_size=256, epochs=10):
x_train, y_train, x_test, y_test = load_data()
input_shape = x_train.shape[1:]
num_classes = y_train.shape[-1]
model = make_basic_model(input_shape, num_classes)
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer='adam',
metrics=['accuracy'])
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
validation_data=(x_test, y_test))
# score = model.evaluate(x_test, y_test, verbose=0)
# print('Test loss:', score[0], 'accuracy:', score[1])
def train_multi_gpu(subbatch_size=256, epochs=10, gpus=2):
x_train, y_train, x_test, y_test = load_data()
input_shape = x_train.shape[1:]
num_classes = y_train.shape[-1]
gpu_count = gpu_count = len([dev for dev in os.environ.get('CUDA_VISIBLE_DEVICES', '').split(',') if len(dev.strip()) > 0])
batch_size = subbatch_size * gpus
print('CUDA_VISIBLE_DEVICES', os.environ.get('CUDA_VISIBLE_DEVICES', ''), 'gpu_count:', gpu_count)
basic_model = make_basic_model(input_shape, num_classes)
parallel_model = DataParallelModel.create(basic_model, gpu_count)
parallel_model.compile(loss=keras.losses.categorical_crossentropy,
optimizer='adam',
metrics=['accuracy'])
parallel_model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
validation_data=(x_test, y_test))
# score = model.evaluate(x_test, y_test, verbose=0)
# print('Test loss:', score[0], 'accuracy:', score[1])
if __name__ == '__main__':
train_multi_gpu()
## Basic model
# 333,066 params
# batch_size=128 -> 9s / epoch
# batch_size=256 -> 7.5s / epoch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment