Last active
June 14, 2021 15:17
-
-
Save dexhunter/9e891c00334eb6bb9ce4eb87e49f897f to your computer and use it in GitHub Desktop.
2d mesh algorithm in horovod (based on NCCLHierarchicalAllreduce)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if HAVE_MPI | |
Status | |
NCCL2DMesh::Execute(std::vector<TensorTableEntry>& entries, | |
const Response& response) { | |
auto& first_entry = entries[0]; | |
// Determine GPU IDs of the devices participating in this communicator. | |
std::vector<int32_t> nccl_device_map; | |
nccl_device_map.reserve( | |
global_state_->controller->GetLocalCommRanks().size()); | |
for (int rank : global_state_->controller->GetLocalCommRanks()) { | |
nccl_device_map.push_back(response.devices()[rank]); | |
} | |
gpu_op_context_.InitGPU(entries); | |
nccl_op_context_.InitNCCLComm(entries, nccl_device_map); | |
gpu_op_context_.InitGPUQueue(entries, response); | |
const void* fused_input_data; | |
void* buffer_data; | |
size_t buffer_len; | |
// Copy memory into the fusion buffer. | |
if (entries.size() > 1) { | |
MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len); | |
if (global_state_->timeline.Initialized()) { | |
gpu_context_->RecordEvent(gpu_op_context_.event_queue, MEMCPY_IN_FUSION_BUFFER, *gpu_op_context_.stream); | |
} | |
} else { | |
fused_input_data = first_entry.tensor->data(); | |
buffer_data = (void*) first_entry.output->data(); | |
buffer_len = (size_t) first_entry.output->size(); | |
} | |
int64_t num_elements = buffer_len / DataType_Size(first_entry.tensor->dtype()); | |
if (response.prescale_factor() != 1.0) { | |
// Execute prescaling op | |
ScaleBuffer(response.prescale_factor(), entries, fused_input_data, buffer_data, num_elements); | |
fused_input_data = buffer_data; // for unfused, scale is done out of place | |
} | |
// Do allreduce. | |
int element_size = mpi_context_->GetMPITypeSize(first_entry.tensor->dtype()); | |
int local_size = global_state_->controller->GetLocalSize(); | |
int local_rank = global_state_->controller->GetLocalRank(); | |
// If cluster is homogeneous and we are using fusion buffer, include | |
// dummy elements from the buffer (if necessary) to make sure the data | |
// is divisible by local_size. This is always possible since we | |
// set the fusion buffer size divisible by local_size. | |
if (global_state_->controller->IsHomogeneous() && entries.size() > 1) { | |
// Making sure the number of elements is divisible by | |
// FUSION_BUFFER_ATOMIC_UNIT for improved performance | |
int div = local_size * FUSION_BUFFER_ATOMIC_UNIT; | |
num_elements = ((num_elements + div - 1) / div) * div; | |
buffer_len = num_elements * element_size; | |
} | |
// Split the elements into two groups: num_elements_per_rank*local_size, | |
// and num_elements_remaining. Cross-node reduction for the first group | |
// is done by all local_rank's in parallel, while for the second group | |
// it it is only done by the root_rank. If the cluster is not | |
// homogeneous first group is zero, and root_rank is 0. | |
// Homogeneous case: | |
// For the part of data divisible by local_size, perform NCCL | |
// ReduceScatter - Parallelized MPI Allreduce - NCCL Allgather. For the | |
// non-divisible part (if any), do NCCL Reduce (at rank local_size-1), | |
// MPI Allreduce (across rank (local_size-1)'s), and NCCL Bcast | |
// int64_t num_elements_per_rank = global_state_->controller->IsHomogeneous() | |
// ? num_elements / local_size | |
// : 0; | |
// void* buffer_data_at_rank_offset = | |
// (uint8_t*)buffer_data + buffer_len_per_rank * local_rank; | |
// size_t buffer_len_remaining = element_size * num_elements_remaining; | |
int root_rank = | |
global_state_->controller->IsHomogeneous() ? local_size - 1 : 0; | |
bool is_root_rank = local_rank == root_rank; | |
// int64_t total_num_elements = | |
// is_root_rank ? num_elements_per_rank + num_elements_remaining | |
// : num_elements_per_rank; | |
// int64_t total_buffer_len = is_root_rank | |
// ? buffer_len_per_rank + buffer_len_remaining | |
// : buffer_len_per_rank; | |
auto& timeline = global_state_->timeline; | |
auto nccl_result = ncclReduce(fused_input_data, | |
buffer_data, | |
(size_t) num_elements, | |
GetNCCLDataType(first_entry.tensor), ncclSum, | |
root_rank, *nccl_op_context_.nccl_comm_, *gpu_op_context_.stream); | |
nccl_context_->ErrorCheck("ncclReduce", nccl_result, *nccl_op_context_.nccl_comm_); | |
if (global_state_->timeline.Initialized()) { | |
gpu_context_->RecordEvent(gpu_op_context_.event_queue, NCCL_REDUCE, *gpu_op_context_.stream); | |
} | |
if (global_state_->controller->IsHomogeneous() || is_root_rank) { | |
// cudaHostAlloc is significantly slower than malloc. Pre-allocating | |
// a buffer is not safe since the tensor can be arbitrarily large. | |
gpu_op_context_.host_buffer = malloc(buffer_len); | |
// Synchronize. | |
gpu_context_->WaitForEvents(gpu_op_context_.event_queue, entries, timeline, nccl_op_context_.error_check_callback_); | |
// According to https://docs.nvidia.com/cuda/cuda-runtime-api/ | |
// api-sync-behavior.html#api-sync-behavior__memcpy-async, | |
// cudaMemcpyAsync is synchronous with respect to the host, so we | |
// memcpy (effectively) synchronously to generate an accurate timeline | |
timeline.ActivityStartAll(entries, MEMCPY_IN_HOST_BUFFER); | |
gpu_context_->MemcpyAsyncD2H(gpu_op_context_.host_buffer, buffer_data, | |
buffer_len, *gpu_op_context_.stream); | |
timeline.ActivityEndAll(entries); | |
timeline.ActivityStartAll(entries, MPI_ALLREDUCE); | |
int op = MPI_Allreduce(MPI_IN_PLACE, gpu_op_context_.host_buffer, | |
(int) num_elements, | |
mpi_context_->GetMPIDataType(first_entry.tensor), | |
mpi_context_->GetMPISumOp(first_entry.tensor->dtype()), | |
mpi_context_->GetMPICommunicator(Communicator::CROSS)); | |
if (op != MPI_SUCCESS) { | |
throw std::runtime_error("MPI_Allreduce failed, see MPI output for details."); | |
} | |
timeline.ActivityEndAll(entries); | |
timeline.ActivityStartAll(entries, MEMCPY_OUT_HOST_BUFFER); | |
gpu_context_->MemcpyAsyncH2D(buffer_data, gpu_op_context_.host_buffer, | |
buffer_len, *gpu_op_context_.stream); | |
timeline.ActivityEndAll(entries); | |
} | |
nccl_context_->ErrorCheck("ncclBcast", | |
ncclBcast(buffer_data, | |
(size_t) num_elements, | |
GetNCCLDataType(first_entry.tensor), root_rank, | |
*nccl_op_context_.nccl_comm_, *gpu_op_context_.stream), | |
*nccl_op_context_.nccl_comm_); | |
if (global_state_->timeline.Initialized()) { | |
gpu_context_->RecordEvent(gpu_op_context_.event_queue, NCCL_BCAST, *gpu_op_context_.stream); | |
} | |
if (response.postscale_factor() != 1.0) { | |
// Execute postscaling op | |
ScaleBuffer(response.postscale_factor(), entries, buffer_data, buffer_data, num_elements); | |
} | |
// Copy memory out of the fusion buffer. | |
if (entries.size() > 1) { | |
MemcpyOutFusionBuffer(buffer_data, entries); | |
if (global_state_->timeline.Initialized()) { | |
gpu_context_->RecordEvent(gpu_op_context_.event_queue, MEMCPY_OUT_FUSION_BUFFER, *gpu_op_context_.stream); | |
} | |
} | |
return gpu_op_context_.FinalizeGPUQueue(entries, true, nccl_op_context_.error_check_callback_); | |
} | |
bool NCCL2DMesh::Enabled(const ParameterManager& param_manager, | |
const std::vector<TensorTableEntry>& entries, | |
const Response& response) const { | |
if (!NCCLAllreduce::Enabled(param_manager, entries, response)) { | |
return false; | |
} | |
return param_manager.HierarchicalAllreduce(); | |
} | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment