Last active
August 9, 2017 13:03
-
-
Save bzamecnik/92607207af912ae53dd2aa557631b977 to your computer and use it in GitHub Desktop.
Data-parallel model in Keras - a sketch (@rossumai)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import keras.backend as K | |
from keras.layers import Lambda | |
from keras.layers.merge import concatenate | |
from keras.models import Model | |
import keras.optimizers | |
from keras.optimizers import clip_norm, Optimizer | |
import tensorflow as tf | |
# this should be fairly ready | |
class DataParallelOptimizer(Optimizer): | |
""" | |
Wrapper class for data-parallel optimization. Multiple model replicas | |
(towers) with shared weights operate on different batch slices and compute | |
gradients in parallel on multiple GPUs. Gradients are then averaged on | |
parameter sever (CPU or one of GPUs) and weights updated. | |
It accepts a list of losses (living on separate devices) instead of a | |
single loss, computes gradients (collocated with losses) for each loss, | |
averages then on the PS device and provides weight update operations. | |
Usage: | |
from keras.optimizers import Adam | |
model.compile(..., optimizer=DataParallelOptimizer(Adam())) | |
""" | |
def __init__(self, optimizer): | |
self.optimizer = keras.optimizers.get(optimizer) | |
def get_gradients(self, losses, params): | |
# NOTE: argument "losses" (list) instead of a single "loss" | |
if isinstance(losses, list): | |
# Gradients for each tower loss. | |
# NOTE: K.gradients call tf.gradiens with | |
# colocate_gradients_with_ops=True, thus each tf.gradient operation | |
# should be collocated with it's respective loss. We assume losses | |
# to be located at different devices. | |
tower_grads = [K.gradients(loss, params) for loss in losses] | |
# Average gradients. | |
# This should be a synchronization point (for sync SGD) and this | |
# operation will be located according to the scope where the main | |
# Model was defined - should be the parameter server device. | |
grads = K.mean(K.stack(tower_grads, 0)) | |
else: | |
grads = K.gradients(losses, params) | |
if hasattr(self, 'clipnorm') and self.clipnorm > 0: | |
norm = K.sqrt(sum([K.sum(K.square(g)) for g in grads])) | |
grads = [clip_norm(g, self.clipnorm, norm) for g in grads] | |
if hasattr(self, 'clipvalue') and self.clipvalue > 0: | |
grads = [K.clip(g, -self.clipvalue, self.clipvalue) for g in grads] | |
return grads | |
def get_updates(self, params, constraints, loss): | |
return self.optimizer.get_updates(params, constraints, loss) | |
@property | |
def weights(self): | |
self.optimizer.weights() | |
def get_config(self): | |
self.optimizer.get_config() | |
def from_config(self, config): | |
self.optimizer.from_config() | |
# so far just an incomplete sketch... | |
class DataParallelModel(Model): | |
def __init__(self, inputs, outputs, basic_model, replicas, name=None): | |
super(DataParallelModel, self).__init__(inputs, outputs, name) | |
self.basic_model = basic_model | |
self.replicas = replicas | |
@classmethod | |
def create(cls, basic_model, gpu_count=2): | |
assert gpu_count >= 2, "At least 2 GPUs" | |
def get_slice(data, idx, parts): | |
shape = tf.shape(data) | |
size = tf.concat([shape[:1] // parts, shape[1:]], axis=0) | |
stride = tf.concat([shape[:1] // parts, shape[1:] * 0], axis=0) | |
start = stride * idx | |
return tf.slice(data, start, size) | |
outputs_all = [] | |
replicas = [] | |
# place operations for replica on a separate device | |
for gpu_id in range(gpu_count): | |
with tf.device("gpu:%d" % gpu_id): | |
with tf.name_scope('replica_%d' % gpu_id): | |
slices = [] | |
# Slice each input into a piece for processing on this GPU | |
for x in basic_model.inputs: | |
input_shape = tuple(x.get_shape().as_list())[1:] | |
slice = Lambda(get_slice, output_shape=input_shape, | |
arguments={'idx': gpu_id, 'parts': gpu_count})(x) | |
slices.append(slice) | |
if gpu_id == 0: | |
for i in range(len(basic_model.outputs)): | |
outputs_all.append([]) | |
outputs = basic_model(slices) | |
replica = Model(inputs=basic_model.inputs, outputs=outputs) | |
replicas.append(replica) | |
if not isinstance(outputs, list): | |
outputs = [outputs] | |
# Save all the outputs for merging back together later | |
for l in range(len(outputs)): | |
outputs_all[l].append(outputs[l]) | |
with tf.device("gpu:0"): | |
merged = [] | |
for outputs in outputs_all: | |
merged.append(concatenate(outputs, axis=0)) | |
return cls(inputs=basic_model.inputs, outputs=merged, | |
basic_model=basic_model, replicas=replicas) | |
def compile(self, optimizer, loss, metrics=None, loss_weights=None, | |
sample_weight_mode=None, **kwargs): | |
""" | |
optimizer - identifier or instance of an optimizer | |
loss - identifier or instance of a loss function | |
""" | |
replica_total_losses = [] | |
# place the loss and gradient operations for replica on a separate device | |
for gpu_id, replica in enumerate(self.replicas): | |
with tf.device("gpu:%d" % gpu_id): | |
with tf.name_scope('replica_%d' % gpu_id): | |
replica.compile(optimizer, loss, metrics, loss_weights) | |
replica_total_losses.append(replica.total_loss) | |
super(DataParallelModel, self).compile( | |
DataParallelOptimizer(optimizer), loss, metrics, loss_weights) | |
# separate losses whose gradient can be computed in parallel | |
self.replica_total_losses = replica_total_losses | |
# redefine total_loss with the average of replica losses | |
self.total_loss = K.mean(K.stack(replica_total_losses, 0)) | |
def _make_train_function(self): | |
if not hasattr(self, 'train_function'): | |
raise RuntimeError('You must compile your model before using it.') | |
if self.train_function is None: | |
inputs = self._feed_inputs + self._feed_targets + self._feed_sample_weights | |
if self.uses_learning_phase and not isinstance(K.learning_phase(), int): | |
inputs += [K.learning_phase()] | |
assert isinstance(self.optimizer, DataParallelOptimizer) | |
training_updates = self.optimizer.get_updates( | |
self._collected_trainable_weights, | |
self.constraints, | |
self.replica_total_losses) | |
updates = self.updates + training_updates | |
# Gets loss and metrics. Updates weights at each call. | |
self.train_function = K.function(inputs, | |
[self.total_loss] + self.metrics_tensors, | |
updates=updates, | |
name='train_function', | |
**self._function_kwargs) | |
# TODO: in ModelCheckpointer save the basic_model |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import keras | |
from keras.datasets import mnist | |
from keras.models import Model | |
from keras.layers import Dense, Dropout, Flatten, Conv2D, Input | |
import os | |
import tensorflow as tf | |
from data_parallel import DataParallelModel, DataParallelOptimizer | |
def load_data(): | |
# input image dimensions | |
img_rows, img_cols = 28, 28 | |
num_classes = 10 | |
# the data, shuffled and split between train and test sets | |
(x_train, y_train), (x_test, y_test) = mnist.load_data() | |
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1).astype('float32') / 255 | |
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1).astype('float32') / 255 | |
print('x_train shape:', x_train.shape) | |
print(x_train.shape[0], 'train samples') | |
print(x_test.shape[0], 'test samples') | |
# convert class vectors to binary class matrices | |
y_train = keras.utils.to_categorical(y_train, num_classes) | |
y_test = keras.utils.to_categorical(y_test, num_classes) | |
return x_train, y_train, x_test, y_test | |
def make_basic_model(input_shape, num_classes): | |
input = Input(shape=input_shape) | |
x = Conv2D(16, 3, activation='selu')(input) | |
x = Dropout(0.3)(x) | |
x = Conv2D(32, 3, activation='selu')(x) | |
x = Dropout(0.3)(x) | |
x = Conv2D(64, 3, activation='selu')(x) | |
x = Dropout(0.3)(x) | |
x = Flatten()(x) | |
output = Dense(num_classes, activation='softmax')(x) | |
model = Model(inputs=input, outputs=output) | |
model.summary() | |
return model | |
def train_single_gpu(batch_size=256, epochs=10): | |
x_train, y_train, x_test, y_test = load_data() | |
input_shape = x_train.shape[1:] | |
num_classes = y_train.shape[-1] | |
model = make_basic_model(input_shape, num_classes) | |
model.compile(loss=keras.losses.categorical_crossentropy, | |
optimizer='adam', | |
metrics=['accuracy']) | |
model.fit(x_train, y_train, | |
batch_size=batch_size, | |
epochs=epochs, | |
verbose=1, | |
validation_data=(x_test, y_test)) | |
# score = model.evaluate(x_test, y_test, verbose=0) | |
# print('Test loss:', score[0], 'accuracy:', score[1]) | |
def train_multi_gpu(subbatch_size=256, epochs=10, gpus=2): | |
x_train, y_train, x_test, y_test = load_data() | |
input_shape = x_train.shape[1:] | |
num_classes = y_train.shape[-1] | |
gpu_count = gpu_count = len([dev for dev in os.environ.get('CUDA_VISIBLE_DEVICES', '').split(',') if len(dev.strip()) > 0]) | |
batch_size = subbatch_size * gpus | |
print('CUDA_VISIBLE_DEVICES', os.environ.get('CUDA_VISIBLE_DEVICES', ''), 'gpu_count:', gpu_count) | |
basic_model = make_basic_model(input_shape, num_classes) | |
parallel_model = DataParallelModel.create(basic_model, gpu_count) | |
parallel_model.compile(loss=keras.losses.categorical_crossentropy, | |
optimizer='adam', | |
metrics=['accuracy']) | |
parallel_model.fit(x_train, y_train, | |
batch_size=batch_size, | |
epochs=epochs, | |
verbose=1, | |
validation_data=(x_test, y_test)) | |
# score = model.evaluate(x_test, y_test, verbose=0) | |
# print('Test loss:', score[0], 'accuracy:', score[1]) | |
if __name__ == '__main__': | |
train_multi_gpu() | |
## Basic model | |
# 333,066 params | |
# batch_size=128 -> 9s / epoch | |
# batch_size=256 -> 7.5s / epoch |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment