Skip to content

Instantly share code, notes, and snippets.

@dexhunter
Last active June 14, 2021 15:17
Show Gist options
  • Save dexhunter/9e891c00334eb6bb9ce4eb87e49f897f to your computer and use it in GitHub Desktop.
Save dexhunter/9e891c00334eb6bb9ce4eb87e49f897f to your computer and use it in GitHub Desktop.
2d mesh algorithm in horovod (based on NCCLHierarchicalAllreduce)
#if HAVE_MPI
Status
NCCL2DMesh::Execute(std::vector<TensorTableEntry>& entries,
const Response& response) {
auto& first_entry = entries[0];
// Determine GPU IDs of the devices participating in this communicator.
std::vector<int32_t> nccl_device_map;
nccl_device_map.reserve(
global_state_->controller->GetLocalCommRanks().size());
for (int rank : global_state_->controller->GetLocalCommRanks()) {
nccl_device_map.push_back(response.devices()[rank]);
}
gpu_op_context_.InitGPU(entries);
nccl_op_context_.InitNCCLComm(entries, nccl_device_map);
gpu_op_context_.InitGPUQueue(entries, response);
const void* fused_input_data;
void* buffer_data;
size_t buffer_len;
// Copy memory into the fusion buffer.
if (entries.size() > 1) {
MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len);
if (global_state_->timeline.Initialized()) {
gpu_context_->RecordEvent(gpu_op_context_.event_queue, MEMCPY_IN_FUSION_BUFFER, *gpu_op_context_.stream);
}
} else {
fused_input_data = first_entry.tensor->data();
buffer_data = (void*) first_entry.output->data();
buffer_len = (size_t) first_entry.output->size();
}
int64_t num_elements = buffer_len / DataType_Size(first_entry.tensor->dtype());
if (response.prescale_factor() != 1.0) {
// Execute prescaling op
ScaleBuffer(response.prescale_factor(), entries, fused_input_data, buffer_data, num_elements);
fused_input_data = buffer_data; // for unfused, scale is done out of place
}
// Do allreduce.
int element_size = mpi_context_->GetMPITypeSize(first_entry.tensor->dtype());
int local_size = global_state_->controller->GetLocalSize();
int local_rank = global_state_->controller->GetLocalRank();
// If cluster is homogeneous and we are using fusion buffer, include
// dummy elements from the buffer (if necessary) to make sure the data
// is divisible by local_size. This is always possible since we
// set the fusion buffer size divisible by local_size.
if (global_state_->controller->IsHomogeneous() && entries.size() > 1) {
// Making sure the number of elements is divisible by
// FUSION_BUFFER_ATOMIC_UNIT for improved performance
int div = local_size * FUSION_BUFFER_ATOMIC_UNIT;
num_elements = ((num_elements + div - 1) / div) * div;
buffer_len = num_elements * element_size;
}
// Split the elements into two groups: num_elements_per_rank*local_size,
// and num_elements_remaining. Cross-node reduction for the first group
// is done by all local_rank's in parallel, while for the second group
// it it is only done by the root_rank. If the cluster is not
// homogeneous first group is zero, and root_rank is 0.
// Homogeneous case:
// For the part of data divisible by local_size, perform NCCL
// ReduceScatter - Parallelized MPI Allreduce - NCCL Allgather. For the
// non-divisible part (if any), do NCCL Reduce (at rank local_size-1),
// MPI Allreduce (across rank (local_size-1)'s), and NCCL Bcast
// int64_t num_elements_per_rank = global_state_->controller->IsHomogeneous()
// ? num_elements / local_size
// : 0;
// void* buffer_data_at_rank_offset =
// (uint8_t*)buffer_data + buffer_len_per_rank * local_rank;
// size_t buffer_len_remaining = element_size * num_elements_remaining;
int root_rank =
global_state_->controller->IsHomogeneous() ? local_size - 1 : 0;
bool is_root_rank = local_rank == root_rank;
// int64_t total_num_elements =
// is_root_rank ? num_elements_per_rank + num_elements_remaining
// : num_elements_per_rank;
// int64_t total_buffer_len = is_root_rank
// ? buffer_len_per_rank + buffer_len_remaining
// : buffer_len_per_rank;
auto& timeline = global_state_->timeline;
auto nccl_result = ncclReduce(fused_input_data,
buffer_data,
(size_t) num_elements,
GetNCCLDataType(first_entry.tensor), ncclSum,
root_rank, *nccl_op_context_.nccl_comm_, *gpu_op_context_.stream);
nccl_context_->ErrorCheck("ncclReduce", nccl_result, *nccl_op_context_.nccl_comm_);
if (global_state_->timeline.Initialized()) {
gpu_context_->RecordEvent(gpu_op_context_.event_queue, NCCL_REDUCE, *gpu_op_context_.stream);
}
if (global_state_->controller->IsHomogeneous() || is_root_rank) {
// cudaHostAlloc is significantly slower than malloc. Pre-allocating
// a buffer is not safe since the tensor can be arbitrarily large.
gpu_op_context_.host_buffer = malloc(buffer_len);
// Synchronize.
gpu_context_->WaitForEvents(gpu_op_context_.event_queue, entries, timeline, nccl_op_context_.error_check_callback_);
// According to https://docs.nvidia.com/cuda/cuda-runtime-api/
// api-sync-behavior.html#api-sync-behavior__memcpy-async,
// cudaMemcpyAsync is synchronous with respect to the host, so we
// memcpy (effectively) synchronously to generate an accurate timeline
timeline.ActivityStartAll(entries, MEMCPY_IN_HOST_BUFFER);
gpu_context_->MemcpyAsyncD2H(gpu_op_context_.host_buffer, buffer_data,
buffer_len, *gpu_op_context_.stream);
timeline.ActivityEndAll(entries);
timeline.ActivityStartAll(entries, MPI_ALLREDUCE);
int op = MPI_Allreduce(MPI_IN_PLACE, gpu_op_context_.host_buffer,
(int) num_elements,
mpi_context_->GetMPIDataType(first_entry.tensor),
mpi_context_->GetMPISumOp(first_entry.tensor->dtype()),
mpi_context_->GetMPICommunicator(Communicator::CROSS));
if (op != MPI_SUCCESS) {
throw std::runtime_error("MPI_Allreduce failed, see MPI output for details.");
}
timeline.ActivityEndAll(entries);
timeline.ActivityStartAll(entries, MEMCPY_OUT_HOST_BUFFER);
gpu_context_->MemcpyAsyncH2D(buffer_data, gpu_op_context_.host_buffer,
buffer_len, *gpu_op_context_.stream);
timeline.ActivityEndAll(entries);
}
nccl_context_->ErrorCheck("ncclBcast",
ncclBcast(buffer_data,
(size_t) num_elements,
GetNCCLDataType(first_entry.tensor), root_rank,
*nccl_op_context_.nccl_comm_, *gpu_op_context_.stream),
*nccl_op_context_.nccl_comm_);
if (global_state_->timeline.Initialized()) {
gpu_context_->RecordEvent(gpu_op_context_.event_queue, NCCL_BCAST, *gpu_op_context_.stream);
}
if (response.postscale_factor() != 1.0) {
// Execute postscaling op
ScaleBuffer(response.postscale_factor(), entries, buffer_data, buffer_data, num_elements);
}
// Copy memory out of the fusion buffer.
if (entries.size() > 1) {
MemcpyOutFusionBuffer(buffer_data, entries);
if (global_state_->timeline.Initialized()) {
gpu_context_->RecordEvent(gpu_op_context_.event_queue, MEMCPY_OUT_FUSION_BUFFER, *gpu_op_context_.stream);
}
}
return gpu_op_context_.FinalizeGPUQueue(entries, true, nccl_op_context_.error_check_callback_);
}
bool NCCL2DMesh::Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const {
if (!NCCLAllreduce::Enabled(param_manager, entries, response)) {
return false;
}
return param_manager.HierarchicalAllreduce();
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment