Last active
August 8, 2023 15:50
-
-
Save thomwolf/7e2407fbd5945f07821adae3d9fd1312 to your computer and use it in GitHub Desktop.
Data Parallelism in PyTorch for modules and losses
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
## Created by: Hang Zhang, Rutgers University, Email: [email protected] | |
## Modified by Thomas Wolf, HuggingFace Inc., Email: [email protected] | |
## Copyright (c) 2017-2018 | |
## | |
## This source code is licensed under the MIT-style license found in the | |
## LICENSE file in the root directory of this source tree | |
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
"""Encoding Data Parallel""" | |
import threading | |
import functools | |
import torch | |
from torch.autograd import Variable, Function | |
import torch.cuda.comm as comm | |
from torch.nn.parallel.data_parallel import DataParallel | |
from torch.nn.parallel.parallel_apply import get_a_var | |
from torch.nn.parallel.scatter_gather import gather | |
from torch.nn.parallel._functions import ReduceAddCoalesced, Broadcast | |
torch_ver = torch.__version__[:3] | |
__all__ = ['allreduce', 'DataParallelModel', 'DataParallelCriterion', | |
'patch_replication_callback'] | |
def allreduce(*inputs): | |
"""Cross GPU all reduce autograd operation for calculate mean and | |
variance in SyncBN. | |
""" | |
return AllReduce.apply(*inputs) | |
class AllReduce(Function): | |
@staticmethod | |
def forward(ctx, num_inputs, *inputs): | |
ctx.num_inputs = num_inputs | |
ctx.target_gpus = [inputs[i].get_device() for i in range(0, len(inputs), num_inputs)] | |
inputs = [inputs[i:i + num_inputs] | |
for i in range(0, len(inputs), num_inputs)] | |
# sort before reduce sum | |
inputs = sorted(inputs, key=lambda i: i[0].get_device()) | |
results = comm.reduce_add_coalesced(inputs, ctx.target_gpus[0]) | |
outputs = comm.broadcast_coalesced(results, ctx.target_gpus) | |
return tuple([t for tensors in outputs for t in tensors]) | |
@staticmethod | |
def backward(ctx, *inputs): | |
inputs = [i.data for i in inputs] | |
inputs = [inputs[i:i + ctx.num_inputs] | |
for i in range(0, len(inputs), ctx.num_inputs)] | |
results = comm.reduce_add_coalesced(inputs, ctx.target_gpus[0]) | |
outputs = comm.broadcast_coalesced(results, ctx.target_gpus) | |
return (None,) + tuple([Variable(t) for tensors in outputs for t in tensors]) | |
class Reduce(Function): | |
@staticmethod | |
def forward(ctx, *inputs): | |
ctx.target_gpus = [inputs[i].get_device() for i in range(len(inputs))] | |
inputs = sorted(inputs, key=lambda i: i.get_device()) | |
return comm.reduce_add(inputs) | |
@staticmethod | |
def backward(ctx, gradOutput): | |
return Broadcast.apply(ctx.target_gpus, gradOutput) | |
class DistributedDataParallelModel(DistributedDataParallel): | |
"""Implements data parallelism at the module level for the DistributedDataParallel module. | |
This container parallelizes the application of the given module by | |
splitting the input across the specified devices by chunking in the | |
batch dimension. | |
In the forward pass, the module is replicated on each device, | |
and each replica handles a portion of the input. During the backwards pass, | |
gradients from each replica are summed into the original module. | |
Note that the outputs are not gathered, please use compatible | |
:class:`encoding.parallel.DataParallelCriterion`. | |
The batch size should be larger than the number of GPUs used. It should | |
also be an integer multiple of the number of GPUs so that each chunk is | |
the same size (so that each GPU processes the same number of samples). | |
Args: | |
module: module to be parallelized | |
device_ids: CUDA devices (default: all devices) | |
Reference: | |
Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi, | |
Amit Agrawal. “Context Encoding for Semantic Segmentation. | |
*The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018* | |
Example:: | |
>>> net = encoding.nn.DistributedDataParallelModel(model, device_ids=[0, 1, 2]) | |
>>> y = net(x) | |
""" | |
def gather(self, outputs, output_device): | |
return outputs | |
class DataParallelModel(DataParallel): | |
"""Implements data parallelism at the module level. | |
This container parallelizes the application of the given module by | |
splitting the input across the specified devices by chunking in the | |
batch dimension. | |
In the forward pass, the module is replicated on each device, | |
and each replica handles a portion of the input. During the backwards pass, | |
gradients from each replica are summed into the original module. | |
Note that the outputs are not gathered, please use compatible | |
:class:`encoding.parallel.DataParallelCriterion`. | |
The batch size should be larger than the number of GPUs used. It should | |
also be an integer multiple of the number of GPUs so that each chunk is | |
the same size (so that each GPU processes the same number of samples). | |
Args: | |
module: module to be parallelized | |
device_ids: CUDA devices (default: all devices) | |
Reference: | |
Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi, | |
Amit Agrawal. “Context Encoding for Semantic Segmentation. | |
*The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018* | |
Example:: | |
>>> net = encoding.nn.DataParallelModel(model, device_ids=[0, 1, 2]) | |
>>> y = net(x) | |
""" | |
def gather(self, outputs, output_device): | |
return outputs | |
def replicate(self, module, device_ids): | |
modules = super(DataParallelModel, self).replicate(module, device_ids) | |
execute_replication_callbacks(modules) | |
return modules | |
class DataParallelCriterion(DataParallel): | |
""" | |
Calculate loss in multiple-GPUs, which balance the memory usage. | |
The targets are splitted across the specified devices by chunking in | |
the batch dimension. Please use together with :class:`encoding.parallel.DataParallelModel`. | |
Reference: | |
Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi, | |
Amit Agrawal. “Context Encoding for Semantic Segmentation. | |
*The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018* | |
Example:: | |
>>> net = encoding.nn.DataParallelModel(model, device_ids=[0, 1, 2]) | |
>>> criterion = encoding.nn.DataParallelCriterion(criterion, device_ids=[0, 1, 2]) | |
>>> y = net(x) | |
>>> loss = criterion(y, target) | |
""" | |
def forward(self, inputs, *targets, **kwargs): | |
# input should be already scatterd | |
# scattering the targets instead | |
if not self.device_ids: | |
return self.module(inputs, *targets, **kwargs) | |
targets, kwargs = self.scatter(targets, kwargs, self.device_ids) | |
if len(self.device_ids) == 1: | |
return self.module(inputs, *targets[0], **kwargs[0]) | |
replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) | |
outputs = _criterion_parallel_apply(replicas, inputs, targets, kwargs) | |
#return Reduce.apply(*outputs) / len(outputs) | |
#return self.gather(outputs, self.output_device).mean() | |
return self.gather(outputs, self.output_device) | |
def _criterion_parallel_apply(modules, inputs, targets, kwargs_tup=None, devices=None): | |
assert len(modules) == len(inputs) | |
assert len(targets) == len(inputs) | |
if kwargs_tup: | |
assert len(modules) == len(kwargs_tup) | |
else: | |
kwargs_tup = ({},) * len(modules) | |
if devices is not None: | |
assert len(modules) == len(devices) | |
else: | |
devices = [None] * len(modules) | |
lock = threading.Lock() | |
results = {} | |
if torch_ver != "0.3": | |
grad_enabled = torch.is_grad_enabled() | |
def _worker(i, module, input, target, kwargs, device=None): | |
if torch_ver != "0.3": | |
torch.set_grad_enabled(grad_enabled) | |
if device is None: | |
device = get_a_var(input).get_device() | |
try: | |
with torch.cuda.device(device): | |
# this also avoids accidental slicing of `input` if it is a Tensor | |
if not isinstance(input, (list, tuple)): | |
input = (input,) | |
if not isinstance(target, (list, tuple)): | |
target = (target,) | |
output = module(*(input + target), **kwargs) | |
with lock: | |
results[i] = output | |
except Exception as e: | |
with lock: | |
results[i] = e | |
if len(modules) > 1: | |
threads = [threading.Thread(target=_worker, | |
args=(i, module, input, target, | |
kwargs, device),) | |
for i, (module, input, target, kwargs, device) in | |
enumerate(zip(modules, inputs, targets, kwargs_tup, devices))] | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() | |
else: | |
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0]) | |
outputs = [] | |
for i in range(len(inputs)): | |
output = results[i] | |
if isinstance(output, Exception): | |
raise output | |
outputs.append(output) | |
return outputs | |
########################################################################### | |
# Adapted from Synchronized-BatchNorm-PyTorch. | |
# https://github.com/vacancy/Synchronized-BatchNorm-PyTorch | |
# | |
class CallbackContext(object): | |
pass | |
def execute_replication_callbacks(modules): | |
""" | |
Execute an replication callback `__data_parallel_replicate__` on each module created | |
by original replication. | |
The callback will be invoked with arguments `__data_parallel_replicate__(ctx, copy_id)` | |
Note that, as all modules are isomorphism, we assign each sub-module with a context | |
(shared among multiple copies of this module on different devices). | |
Through this context, different copies can share some information. | |
We guarantee that the callback on the master copy (the first copy) will be called ahead | |
of calling the callback of any slave copies. | |
""" | |
master_copy = modules[0] | |
nr_modules = len(list(master_copy.modules())) | |
ctxs = [CallbackContext() for _ in range(nr_modules)] | |
for i, module in enumerate(modules): | |
for j, m in enumerate(module.modules()): | |
if hasattr(m, '__data_parallel_replicate__'): | |
m.__data_parallel_replicate__(ctxs[j], i) | |
def patch_replication_callback(data_parallel): | |
""" | |
Monkey-patch an existing `DataParallel` object. Add the replication callback. | |
Useful when you have customized `DataParallel` implementation. | |
Examples: | |
> sync_bn = SynchronizedBatchNorm1d(10, eps=1e-5, affine=False) | |
> sync_bn = DataParallel(sync_bn, device_ids=[0, 1]) | |
> patch_replication_callback(sync_bn) | |
# this is equivalent to | |
> sync_bn = SynchronizedBatchNorm1d(10, eps=1e-5, affine=False) | |
> sync_bn = DataParallelWithCallback(sync_bn, device_ids=[0, 1]) | |
""" | |
assert isinstance(data_parallel, DataParallel) | |
old_replicate = data_parallel.replicate | |
@functools.wraps(old_replicate) | |
def new_replicate(module, device_ids): | |
modules = old_replicate(module, device_ids) | |
execute_replication_callbacks(modules) | |
return modules | |
data_parallel.replicate = new_replicate |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I was able to solve this by adding a gather function under the class DataParallelCriterion(DataParallel)
For some reason return self.gather(outputs, self.output_device) was moving all losses to cuda:0