-
Star
(136)
You must be signed in to star a gist -
Fork
(32)
You must be signed in to fork a gist
-
-
Save thomwolf/7e2407fbd5945f07821adae3d9fd1312 to your computer and use it in GitHub Desktop.
| ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
| ## Created by: Hang Zhang, Rutgers University, Email: [email protected] | |
| ## Modified by Thomas Wolf, HuggingFace Inc., Email: [email protected] | |
| ## Copyright (c) 2017-2018 | |
| ## | |
| ## This source code is licensed under the MIT-style license found in the | |
| ## LICENSE file in the root directory of this source tree | |
| ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
| """Encoding Data Parallel""" | |
| import threading | |
| import functools | |
| import torch | |
| from torch.autograd import Variable, Function | |
| import torch.cuda.comm as comm | |
| from torch.nn.parallel.data_parallel import DataParallel | |
| from torch.nn.parallel.parallel_apply import get_a_var | |
| from torch.nn.parallel.scatter_gather import gather | |
| from torch.nn.parallel._functions import ReduceAddCoalesced, Broadcast | |
| torch_ver = torch.__version__[:3] | |
| __all__ = ['allreduce', 'DataParallelModel', 'DataParallelCriterion', | |
| 'patch_replication_callback'] | |
| def allreduce(*inputs): | |
| """Cross GPU all reduce autograd operation for calculate mean and | |
| variance in SyncBN. | |
| """ | |
| return AllReduce.apply(*inputs) | |
| class AllReduce(Function): | |
| @staticmethod | |
| def forward(ctx, num_inputs, *inputs): | |
| ctx.num_inputs = num_inputs | |
| ctx.target_gpus = [inputs[i].get_device() for i in range(0, len(inputs), num_inputs)] | |
| inputs = [inputs[i:i + num_inputs] | |
| for i in range(0, len(inputs), num_inputs)] | |
| # sort before reduce sum | |
| inputs = sorted(inputs, key=lambda i: i[0].get_device()) | |
| results = comm.reduce_add_coalesced(inputs, ctx.target_gpus[0]) | |
| outputs = comm.broadcast_coalesced(results, ctx.target_gpus) | |
| return tuple([t for tensors in outputs for t in tensors]) | |
| @staticmethod | |
| def backward(ctx, *inputs): | |
| inputs = [i.data for i in inputs] | |
| inputs = [inputs[i:i + ctx.num_inputs] | |
| for i in range(0, len(inputs), ctx.num_inputs)] | |
| results = comm.reduce_add_coalesced(inputs, ctx.target_gpus[0]) | |
| outputs = comm.broadcast_coalesced(results, ctx.target_gpus) | |
| return (None,) + tuple([Variable(t) for tensors in outputs for t in tensors]) | |
| class Reduce(Function): | |
| @staticmethod | |
| def forward(ctx, *inputs): | |
| ctx.target_gpus = [inputs[i].get_device() for i in range(len(inputs))] | |
| inputs = sorted(inputs, key=lambda i: i.get_device()) | |
| return comm.reduce_add(inputs) | |
| @staticmethod | |
| def backward(ctx, gradOutput): | |
| return Broadcast.apply(ctx.target_gpus, gradOutput) | |
| class DistributedDataParallelModel(DistributedDataParallel): | |
| """Implements data parallelism at the module level for the DistributedDataParallel module. | |
| This container parallelizes the application of the given module by | |
| splitting the input across the specified devices by chunking in the | |
| batch dimension. | |
| In the forward pass, the module is replicated on each device, | |
| and each replica handles a portion of the input. During the backwards pass, | |
| gradients from each replica are summed into the original module. | |
| Note that the outputs are not gathered, please use compatible | |
| :class:`encoding.parallel.DataParallelCriterion`. | |
| The batch size should be larger than the number of GPUs used. It should | |
| also be an integer multiple of the number of GPUs so that each chunk is | |
| the same size (so that each GPU processes the same number of samples). | |
| Args: | |
| module: module to be parallelized | |
| device_ids: CUDA devices (default: all devices) | |
| Reference: | |
| Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi, | |
| Amit Agrawal. “Context Encoding for Semantic Segmentation. | |
| *The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018* | |
| Example:: | |
| >>> net = encoding.nn.DistributedDataParallelModel(model, device_ids=[0, 1, 2]) | |
| >>> y = net(x) | |
| """ | |
| def gather(self, outputs, output_device): | |
| return outputs | |
| class DataParallelModel(DataParallel): | |
| """Implements data parallelism at the module level. | |
| This container parallelizes the application of the given module by | |
| splitting the input across the specified devices by chunking in the | |
| batch dimension. | |
| In the forward pass, the module is replicated on each device, | |
| and each replica handles a portion of the input. During the backwards pass, | |
| gradients from each replica are summed into the original module. | |
| Note that the outputs are not gathered, please use compatible | |
| :class:`encoding.parallel.DataParallelCriterion`. | |
| The batch size should be larger than the number of GPUs used. It should | |
| also be an integer multiple of the number of GPUs so that each chunk is | |
| the same size (so that each GPU processes the same number of samples). | |
| Args: | |
| module: module to be parallelized | |
| device_ids: CUDA devices (default: all devices) | |
| Reference: | |
| Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi, | |
| Amit Agrawal. “Context Encoding for Semantic Segmentation. | |
| *The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018* | |
| Example:: | |
| >>> net = encoding.nn.DataParallelModel(model, device_ids=[0, 1, 2]) | |
| >>> y = net(x) | |
| """ | |
| def gather(self, outputs, output_device): | |
| return outputs | |
| def replicate(self, module, device_ids): | |
| modules = super(DataParallelModel, self).replicate(module, device_ids) | |
| execute_replication_callbacks(modules) | |
| return modules | |
| class DataParallelCriterion(DataParallel): | |
| """ | |
| Calculate loss in multiple-GPUs, which balance the memory usage. | |
| The targets are splitted across the specified devices by chunking in | |
| the batch dimension. Please use together with :class:`encoding.parallel.DataParallelModel`. | |
| Reference: | |
| Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi, | |
| Amit Agrawal. “Context Encoding for Semantic Segmentation. | |
| *The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018* | |
| Example:: | |
| >>> net = encoding.nn.DataParallelModel(model, device_ids=[0, 1, 2]) | |
| >>> criterion = encoding.nn.DataParallelCriterion(criterion, device_ids=[0, 1, 2]) | |
| >>> y = net(x) | |
| >>> loss = criterion(y, target) | |
| """ | |
| def forward(self, inputs, *targets, **kwargs): | |
| # input should be already scatterd | |
| # scattering the targets instead | |
| if not self.device_ids: | |
| return self.module(inputs, *targets, **kwargs) | |
| targets, kwargs = self.scatter(targets, kwargs, self.device_ids) | |
| if len(self.device_ids) == 1: | |
| return self.module(inputs, *targets[0], **kwargs[0]) | |
| replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) | |
| outputs = _criterion_parallel_apply(replicas, inputs, targets, kwargs) | |
| #return Reduce.apply(*outputs) / len(outputs) | |
| #return self.gather(outputs, self.output_device).mean() | |
| return self.gather(outputs, self.output_device) | |
| def _criterion_parallel_apply(modules, inputs, targets, kwargs_tup=None, devices=None): | |
| assert len(modules) == len(inputs) | |
| assert len(targets) == len(inputs) | |
| if kwargs_tup: | |
| assert len(modules) == len(kwargs_tup) | |
| else: | |
| kwargs_tup = ({},) * len(modules) | |
| if devices is not None: | |
| assert len(modules) == len(devices) | |
| else: | |
| devices = [None] * len(modules) | |
| lock = threading.Lock() | |
| results = {} | |
| if torch_ver != "0.3": | |
| grad_enabled = torch.is_grad_enabled() | |
| def _worker(i, module, input, target, kwargs, device=None): | |
| if torch_ver != "0.3": | |
| torch.set_grad_enabled(grad_enabled) | |
| if device is None: | |
| device = get_a_var(input).get_device() | |
| try: | |
| with torch.cuda.device(device): | |
| # this also avoids accidental slicing of `input` if it is a Tensor | |
| if not isinstance(input, (list, tuple)): | |
| input = (input,) | |
| if not isinstance(target, (list, tuple)): | |
| target = (target,) | |
| output = module(*(input + target), **kwargs) | |
| with lock: | |
| results[i] = output | |
| except Exception as e: | |
| with lock: | |
| results[i] = e | |
| if len(modules) > 1: | |
| threads = [threading.Thread(target=_worker, | |
| args=(i, module, input, target, | |
| kwargs, device),) | |
| for i, (module, input, target, kwargs, device) in | |
| enumerate(zip(modules, inputs, targets, kwargs_tup, devices))] | |
| for thread in threads: | |
| thread.start() | |
| for thread in threads: | |
| thread.join() | |
| else: | |
| _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0]) | |
| outputs = [] | |
| for i in range(len(inputs)): | |
| output = results[i] | |
| if isinstance(output, Exception): | |
| raise output | |
| outputs.append(output) | |
| return outputs | |
| ########################################################################### | |
| # Adapted from Synchronized-BatchNorm-PyTorch. | |
| # https://github.com/vacancy/Synchronized-BatchNorm-PyTorch | |
| # | |
| class CallbackContext(object): | |
| pass | |
| def execute_replication_callbacks(modules): | |
| """ | |
| Execute an replication callback `__data_parallel_replicate__` on each module created | |
| by original replication. | |
| The callback will be invoked with arguments `__data_parallel_replicate__(ctx, copy_id)` | |
| Note that, as all modules are isomorphism, we assign each sub-module with a context | |
| (shared among multiple copies of this module on different devices). | |
| Through this context, different copies can share some information. | |
| We guarantee that the callback on the master copy (the first copy) will be called ahead | |
| of calling the callback of any slave copies. | |
| """ | |
| master_copy = modules[0] | |
| nr_modules = len(list(master_copy.modules())) | |
| ctxs = [CallbackContext() for _ in range(nr_modules)] | |
| for i, module in enumerate(modules): | |
| for j, m in enumerate(module.modules()): | |
| if hasattr(m, '__data_parallel_replicate__'): | |
| m.__data_parallel_replicate__(ctxs[j], i) | |
| def patch_replication_callback(data_parallel): | |
| """ | |
| Monkey-patch an existing `DataParallel` object. Add the replication callback. | |
| Useful when you have customized `DataParallel` implementation. | |
| Examples: | |
| > sync_bn = SynchronizedBatchNorm1d(10, eps=1e-5, affine=False) | |
| > sync_bn = DataParallel(sync_bn, device_ids=[0, 1]) | |
| > patch_replication_callback(sync_bn) | |
| # this is equivalent to | |
| > sync_bn = SynchronizedBatchNorm1d(10, eps=1e-5, affine=False) | |
| > sync_bn = DataParallelWithCallback(sync_bn, device_ids=[0, 1]) | |
| """ | |
| assert isinstance(data_parallel, DataParallel) | |
| old_replicate = data_parallel.replicate | |
| @functools.wraps(old_replicate) | |
| def new_replicate(module, device_ids): | |
| modules = old_replicate(module, device_ids) | |
| execute_replication_callbacks(modules) | |
| return modules | |
| data_parallel.replicate = new_replicate |
@sajidrahman were you able to fix your issue? I'm working with the huggingface code for BERT and getting the same error.
@Luonic, i think you are right. I wrapped the loss function in DataParallelCriterion and got the same error.
i meet the question :
File "train.py", line 159, in
loss.backward()
File "/usr/hujh/anaconda3/lib/python3.7/site-packages/torch/tensor.py", line 118, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph)
File "/usr/hujh/anaconda3/lib/python3.7/site-packages/torch/autograd/init.py", line 87, in backward
grad_tensors = _make_grads(tensors, grad_tensors)
File "/usr/hujh/anaconda3/lib/python3.7/site-packages/torch/autograd/init.py", line 28, in _make_grads
raise RuntimeError("grad can be implicitly created only for scalar outputs")
RuntimeError: grad can be implicitly created only for scalar outputs
i really dont kown how to slove it ,i think the issue is when loss.backward,it dose not distrubute the loss scalar into per gpu.and i dont konw how to solve it
@alexhuooxx were you able to fix your issue? i got the same error.
Hi, there is a user warning. I use pytorch v1.2.0.
torch/nn/parallel/_functions.py:61: UserWarning: Was asked to gather along dimension 0, but all input tensors were scalars; will instead unsqueeze and return a vector. warnings.warn('Was asked to gather along dimension 0, but all '
Also If I use two GPUs, then the loss is a list [loss_1, loss_2]. Before loss.backward, I need to sum or mean them.
Hi, there is a user warning. I use pytorch v1.2.0.
torch/nn/parallel/_functions.py:61: UserWarning: Was asked to gather along dimension 0, but all input tensors were scalars; will instead unsqueeze and return a vector. warnings.warn('Was asked to gather along dimension 0, but all '
Also If I use two GPUs, then the loss is a list [loss_1, loss_2]. Before loss.backward, I need to sum or mean them.
@AlexHex7 I feel like if the loss list is calculated as mean loss, this code will not balance load on multi-GPU machine 🤔
Not sure if there is anyone successfully run this code and solved the unbalancing memory usage problem.
When I run the code here following the guidance from this post, I get this error when I try to train:
line 345, in _conv_forward
return F.conv2d(input, weight, self.bias, self.stride,
RuntimeError: Expected tensor for argument #1 'input' to have the same device as tensor for argument #2 'weight'; but device 1 does not equal 0 (while checking arguments for cudnn_convolution)
This is how I'm using the code from here:
if torch.cuda.device_count() > 1:
model = DataParallelModel(model, device_ids=[0, 1])
loss_func = DataParallelCriterion(loss_func, device_ids=[0, 1])
@sajidrahman,were you able to fix your issue? i got the same error.
same error get, and i don't know how to solve it
I was able to solve this by adding a gather function under the class DataParallelCriterion(DataParallel)
def gather(self, outputs, output_device):
return outputs
For some reason return self.gather(outputs, self.output_device) was moving all losses to cuda:0
Hi, there is a user warning. I use pytorch v1.2.0.
torch/nn/parallel/_functions.py:61: UserWarning: Was asked to gather along dimension 0, but all input tensors were scalars; will instead unsqueeze and return a vector. warnings.warn('Was asked to gather along dimension 0, but all '
Also If I use two GPUs, then the loss is a list [loss_1, loss_2]. Before loss.backward, I need to sum or mean them.
@sajidrahman i have not used this gist but for me it is strange that you are passing in
parallel_loss = DataParallelCriterion(model, device_ids=[0,1])model to parallel criterion. I think loss calculation class inherited from nn.Module should go there.