Skip to content

Instantly share code, notes, and snippets.

@648trindade
Last active February 28, 2022 20:42
Show Gist options
  • Save 648trindade/237ead37b2e9e983c862cb752a4b0d72 to your computer and use it in GitHub Desktop.
Save 648trindade/237ead37b2e9e983c862cb752a4b0d72 to your computer and use it in GitHub Desktop.
MPI Merge Sort in C++
#include <vector>
#include <algorithm>
#include <iostream>
#include <fstream>
#include <numeric>
#include <cstdlib>
#include <ctime>
#include <omp.h>
#include <mpi.h>
#define WRITE_FILES 1
class DistributedMergeSort
{
public:
DistributedMergeSort(
std::vector<int> &local_array, int world_rank, const std::vector<size_t>& world_counts);
void sort();
private:
void merge_sort(int start_rank, size_t end_rank);
void exchange_and_merge(int left_rank, int right_rank);
template<bool front>
void merge_algorithm(size_t mid_offset);
std::vector<int>& local_array;
int world_rank;
std::vector<size_t> world_counts;
std::vector<int> buffer;
};
template<>
void DistributedMergeSort::merge_algorithm<true>(size_t mid_offset);
template<>
void DistributedMergeSort::merge_algorithm<false>(size_t mid_offset);
/* utilities */
std::vector<size_t> compute_node_sizes(size_t size, int world_rank, int world_size);
void check(const std::vector<int> &local_array, int world_rank, int world_size,
const std::vector<size_t>& world_counts, const std::string& name);
int main(int argc, char *argv[])
{
if (argc == 1)
{
return 0;
}
/********** Initialize MPI **********/
int world_rank;
int world_size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
/* Compute sizes for each MPI node */
size_t global_size = std::atoi(argv[1]);
std::vector<size_t> world_counts = compute_node_sizes(global_size, world_rank, world_size);
size_t count = world_counts[world_rank];
/* Fill subarray with random values */
std::vector<int> local_array(count);
srand(time(NULL) + world_rank);
for (int c = 0; c < count; c++)
{
local_array[c] = rand() % (global_size * 2);
}
#if WRITE_FILES
/* check values (must be unsorted) */
check(local_array, world_rank, world_size, world_counts, "/tmp/unsorted");
#endif
MPI_Barrier(MPI_COMM_WORLD);
const double start = omp_get_wtime();
/* Apply the distributed merge sort */
DistributedMergeSort merger(local_array, world_rank, world_counts);
merger.sort();
MPI_Barrier(MPI_COMM_WORLD);
const double end = omp_get_wtime();
if (world_rank == 0)
{
std::cout << "Time: " << (end - start) << std::endl;
}
/* check values (must be sorted) */
check(local_array, world_rank, world_size, world_counts, "/tmp/sorted");
MPI_Finalize();
}
std::vector<size_t> compute_node_sizes(size_t size, int world_rank, int world_size)
{
std::vector<size_t> world_counts(world_size, size / world_size);
for (int i = 0 ; i < (size % world_size); ++i)
{
world_counts[i] += 1;
}
return world_counts;
}
void check(const std::vector<int> &local_array, int world_rank, int world_size,
const std::vector<size_t>& world_counts, const std::string& name)
{
std::vector<int> buffer;
int last = -1;
bool ok = true;
long long checksum = 0;
for (int rank_id = 0; rank_id < world_size; rank_id++)
{
if (rank_id > 0)
{
if (world_rank > 0 && rank_id == world_rank)
{
MPI_Send(local_array.data(), local_array.size(), MPI_INT, 0, 0, MPI_COMM_WORLD);
}
else if (world_rank == 0)
{
buffer.assign(world_counts[rank_id], 0);
MPI_Recv(buffer.data(), buffer.size(), MPI_INT, rank_id, 0, MPI_COMM_WORLD, nullptr);
}
}
else if (world_rank == 0)
{
buffer.assign(local_array.begin(), local_array.end());
}
std::for_each(buffer.begin(), buffer.end(), [&checksum](const int& v){ checksum += v; });
if (world_rank == 0)
{
for (int v : buffer)
{
ok &= (v >= last);
last = v;
}
}
}
if (world_rank == 0)
{
std::cout << (ok ? "array sorted" : "array unsorted") << " checksum: " << checksum
<< std::endl;
}
#if WRITE_FILES
std::ofstream file(name + "_" + std::to_string(world_rank) + ".txt");
std::for_each(local_array.begin(), local_array.end(), [&file](int v){ file << v << std::endl; });
file.close();
#endif
MPI_Barrier(MPI_COMM_WORLD);
}
DistributedMergeSort::DistributedMergeSort(
std::vector<int> &local_array, int world_rank, const std::vector<size_t>& world_counts)
: local_array(local_array), world_rank(world_rank), world_counts(world_counts)
{
size_t max_near_size = std::max(
world_rank > 0 ? world_counts[world_rank - 1] : 0,
world_rank < world_counts.size() - 1 ? world_counts[world_rank + 1] : 0
);
this->buffer.resize(world_counts[world_rank] + max_near_size);
}
void DistributedMergeSort::sort()
{
this->merge_sort(0, this->world_counts.size());
}
void DistributedMergeSort::merge_sort(int start_rank, size_t end_rank)
{
if (end_rank - start_rank == 1)
{
std::sort(this->local_array.begin(), this->local_array.end());
return;
}
const int mid_rank = (end_rank + start_rank) / 2;
const int size = end_rank - start_rank;
if (this->world_rank < mid_rank)
{
this->merge_sort(start_rank, mid_rank);
}
else
{
this->merge_sort(mid_rank, end_rank);
}
/* Bi-directional bubble sort */
int left_end = start_rank;
int right_end = end_rank - 1;
while (left_end < mid_rank && right_end > mid_rank - 1)
{
if (this->world_rank < mid_rank) // right then left
{
for (int rank = mid_rank; rank > left_end; rank--)
{
if (world_rank == rank || this->world_rank == rank - 1)
{
this->exchange_and_merge(rank - 1, rank);
}
}
}
else // left then right
{
for (int rank = mid_rank - 1; rank < right_end; rank++)
{
if (this->world_rank == rank || this->world_rank == rank + 1)
{
this->exchange_and_merge(rank, rank + 1);
}
}
}
left_end++;
right_end--;
}
}
template<>
void DistributedMergeSort::merge_algorithm<true>(size_t mid_offset)
{
const auto& input = this->buffer;
auto& output = this->local_array;
size_t left_index = 0;
size_t right_index = mid_offset;
const size_t input_size = input.size();
for (int i = 0; i < output.size(); i++)
{
const bool valid_left_index = left_index < mid_offset;
const bool valid_right_index = right_index < input_size;
if (valid_left_index && (!valid_right_index || input[left_index] < input[right_index]))
{
output[i] = input[left_index++];
}
else
{
output[i] = input[right_index++];
}
}
}
template<>
void DistributedMergeSort::merge_algorithm<false>(size_t mid_offset)
{
const auto& input = this->buffer;
auto& output = this->local_array;
size_t left_index = input.size() - 1;
size_t right_index = mid_offset - 1;
for (int i = output.size() - 1; i >= 0; i--)
{
const bool valid_left_index = left_index >= mid_offset;
const bool valid_right_index = right_index >= 0;
if (valid_left_index && (!valid_right_index || input[left_index] >= input[right_index]))
{
output[i] = input[left_index--];
}
else
{
output[i] = input[right_index--];
}
}
}
void DistributedMergeSort::exchange_and_merge(int left_rank, int right_rank)
{
const bool on_left_side = this->world_rank == left_rank;
const int near_rank = on_left_side ? right_rank : left_rank;
const size_t home_array_size = this->world_counts[this->world_rank];
const size_t near_array_size = this->world_counts[near_rank];
if (home_array_size == 0 || near_array_size == 0)
{
return;
}
this->buffer.resize(home_array_size + near_array_size);
std::copy(this->local_array.begin(), this->local_array.end(), this->buffer.begin());
MPI_Sendrecv(
this->local_array.data(), home_array_size, MPI_INT, near_rank, 1,
this->buffer.data() + home_array_size, near_array_size, MPI_INT, near_rank, 1,
MPI_COMM_WORLD, nullptr);
if (on_left_side)
{
if (this->buffer[home_array_size] >= this->local_array.back())
{
return;
}
merge_algorithm<true>(home_array_size);
}
else
{
if (this->local_array[0] >= this->buffer.back())
{
return;
}
merge_algorithm<false>(home_array_size);
}
}

MPI Merge Sort in C++

Sort an integer array filled with random numbers, and distributed over MPI nodes.

The maximum array size N for a node must be at least three times smaller than the memory available to the node.

The algorithm is very simple and not optimal:

  • Recursively subdivide node range until we got a single node
  • Sort the subarray on this node
  • Recursively merge arrays from neighbors nodes
    • The merge algorithm is like a bubble sort, where data 'floats' from the center of current node section to the extremes
    • Complexity in terms of communications is log (N) * 2 * (N/2)^2 (where N is the number of MPI nodes)
      • log(N): height of recursion
      • 2 * (N/2)^2: bi-directional bubble-sort algorithm applied on the 2 half-sections of each recursion level
    • Each node sends its full subarray to the neighbor node, and receive its full subarray. Each one applies the merge algorithm (common from merge sort), then select only the relevant merge section (start or end of the merged array)
  • No explicit barrier synchronization is needed

Compile

Tested on g++ 10.2 and Open MPI 4.1.2

mpic++ -std=c++17 mpi_merge_sort.cpp -lgomp -o mpi_merge_sort

Run

mpirun --use-hwthread-cpus -n <Number of Processes> mpi_merge_sort <Array Size>

Utility bash function to merge output files

function merge_files() { 
    name=$1
    max_id=$2
    rm /tmp/${name}.txt
    for i in `seq 0 ${max_id}`
    do 
        cat ${name}_${i}.txt >> ${name}.txt
    done; 
}

# use example:
# $ merge_files sorted <Number of Processes - 1>
# $ merge_files unsorted <Number of Processes - 1>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment