|
#include <vector> |
|
#include <algorithm> |
|
#include <iostream> |
|
#include <fstream> |
|
#include <numeric> |
|
#include <cstdlib> |
|
#include <ctime> |
|
#include <omp.h> |
|
#include <mpi.h> |
|
|
|
#define WRITE_FILES 1 |
|
|
|
class DistributedMergeSort |
|
{ |
|
public: |
|
DistributedMergeSort( |
|
std::vector<int> &local_array, int world_rank, const std::vector<size_t>& world_counts); |
|
|
|
void sort(); |
|
|
|
private: |
|
void merge_sort(int start_rank, size_t end_rank); |
|
void exchange_and_merge(int left_rank, int right_rank); |
|
|
|
template<bool front> |
|
void merge_algorithm(size_t mid_offset); |
|
|
|
std::vector<int>& local_array; |
|
int world_rank; |
|
std::vector<size_t> world_counts; |
|
std::vector<int> buffer; |
|
}; |
|
|
|
template<> |
|
void DistributedMergeSort::merge_algorithm<true>(size_t mid_offset); |
|
|
|
template<> |
|
void DistributedMergeSort::merge_algorithm<false>(size_t mid_offset); |
|
|
|
/* utilities */ |
|
std::vector<size_t> compute_node_sizes(size_t size, int world_rank, int world_size); |
|
void check(const std::vector<int> &local_array, int world_rank, int world_size, |
|
const std::vector<size_t>& world_counts, const std::string& name); |
|
|
|
int main(int argc, char *argv[]) |
|
{ |
|
if (argc == 1) |
|
{ |
|
return 0; |
|
} |
|
|
|
/********** Initialize MPI **********/ |
|
int world_rank; |
|
int world_size; |
|
MPI_Init(&argc, &argv); |
|
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); |
|
MPI_Comm_size(MPI_COMM_WORLD, &world_size); |
|
|
|
/* Compute sizes for each MPI node */ |
|
size_t global_size = std::atoi(argv[1]); |
|
std::vector<size_t> world_counts = compute_node_sizes(global_size, world_rank, world_size); |
|
size_t count = world_counts[world_rank]; |
|
|
|
/* Fill subarray with random values */ |
|
std::vector<int> local_array(count); |
|
srand(time(NULL) + world_rank); |
|
for (int c = 0; c < count; c++) |
|
{ |
|
local_array[c] = rand() % (global_size * 2); |
|
} |
|
|
|
#if WRITE_FILES |
|
/* check values (must be unsorted) */ |
|
check(local_array, world_rank, world_size, world_counts, "/tmp/unsorted"); |
|
#endif |
|
|
|
MPI_Barrier(MPI_COMM_WORLD); |
|
const double start = omp_get_wtime(); |
|
|
|
/* Apply the distributed merge sort */ |
|
DistributedMergeSort merger(local_array, world_rank, world_counts); |
|
merger.sort(); |
|
|
|
MPI_Barrier(MPI_COMM_WORLD); |
|
const double end = omp_get_wtime(); |
|
if (world_rank == 0) |
|
{ |
|
std::cout << "Time: " << (end - start) << std::endl; |
|
} |
|
|
|
/* check values (must be sorted) */ |
|
check(local_array, world_rank, world_size, world_counts, "/tmp/sorted"); |
|
|
|
MPI_Finalize(); |
|
} |
|
|
|
std::vector<size_t> compute_node_sizes(size_t size, int world_rank, int world_size) |
|
{ |
|
std::vector<size_t> world_counts(world_size, size / world_size); |
|
for (int i = 0 ; i < (size % world_size); ++i) |
|
{ |
|
world_counts[i] += 1; |
|
} |
|
return world_counts; |
|
} |
|
|
|
void check(const std::vector<int> &local_array, int world_rank, int world_size, |
|
const std::vector<size_t>& world_counts, const std::string& name) |
|
{ |
|
std::vector<int> buffer; |
|
int last = -1; |
|
bool ok = true; |
|
long long checksum = 0; |
|
for (int rank_id = 0; rank_id < world_size; rank_id++) |
|
{ |
|
if (rank_id > 0) |
|
{ |
|
if (world_rank > 0 && rank_id == world_rank) |
|
{ |
|
MPI_Send(local_array.data(), local_array.size(), MPI_INT, 0, 0, MPI_COMM_WORLD); |
|
} |
|
else if (world_rank == 0) |
|
{ |
|
buffer.assign(world_counts[rank_id], 0); |
|
MPI_Recv(buffer.data(), buffer.size(), MPI_INT, rank_id, 0, MPI_COMM_WORLD, nullptr); |
|
} |
|
} |
|
else if (world_rank == 0) |
|
{ |
|
buffer.assign(local_array.begin(), local_array.end()); |
|
} |
|
std::for_each(buffer.begin(), buffer.end(), [&checksum](const int& v){ checksum += v; }); |
|
|
|
if (world_rank == 0) |
|
{ |
|
for (int v : buffer) |
|
{ |
|
ok &= (v >= last); |
|
last = v; |
|
} |
|
} |
|
} |
|
if (world_rank == 0) |
|
{ |
|
std::cout << (ok ? "array sorted" : "array unsorted") << " checksum: " << checksum |
|
<< std::endl; |
|
} |
|
#if WRITE_FILES |
|
std::ofstream file(name + "_" + std::to_string(world_rank) + ".txt"); |
|
std::for_each(local_array.begin(), local_array.end(), [&file](int v){ file << v << std::endl; }); |
|
file.close(); |
|
#endif |
|
MPI_Barrier(MPI_COMM_WORLD); |
|
} |
|
|
|
DistributedMergeSort::DistributedMergeSort( |
|
std::vector<int> &local_array, int world_rank, const std::vector<size_t>& world_counts) |
|
: local_array(local_array), world_rank(world_rank), world_counts(world_counts) |
|
{ |
|
size_t max_near_size = std::max( |
|
world_rank > 0 ? world_counts[world_rank - 1] : 0, |
|
world_rank < world_counts.size() - 1 ? world_counts[world_rank + 1] : 0 |
|
); |
|
this->buffer.resize(world_counts[world_rank] + max_near_size); |
|
} |
|
|
|
void DistributedMergeSort::sort() |
|
{ |
|
this->merge_sort(0, this->world_counts.size()); |
|
} |
|
|
|
void DistributedMergeSort::merge_sort(int start_rank, size_t end_rank) |
|
{ |
|
if (end_rank - start_rank == 1) |
|
{ |
|
std::sort(this->local_array.begin(), this->local_array.end()); |
|
return; |
|
} |
|
|
|
const int mid_rank = (end_rank + start_rank) / 2; |
|
const int size = end_rank - start_rank; |
|
|
|
if (this->world_rank < mid_rank) |
|
{ |
|
this->merge_sort(start_rank, mid_rank); |
|
} |
|
else |
|
{ |
|
this->merge_sort(mid_rank, end_rank); |
|
} |
|
|
|
/* Bi-directional bubble sort */ |
|
int left_end = start_rank; |
|
int right_end = end_rank - 1; |
|
while (left_end < mid_rank && right_end > mid_rank - 1) |
|
{ |
|
if (this->world_rank < mid_rank) // right then left |
|
{ |
|
for (int rank = mid_rank; rank > left_end; rank--) |
|
{ |
|
if (world_rank == rank || this->world_rank == rank - 1) |
|
{ |
|
this->exchange_and_merge(rank - 1, rank); |
|
} |
|
} |
|
} |
|
else // left then right |
|
{ |
|
for (int rank = mid_rank - 1; rank < right_end; rank++) |
|
{ |
|
if (this->world_rank == rank || this->world_rank == rank + 1) |
|
{ |
|
this->exchange_and_merge(rank, rank + 1); |
|
} |
|
} |
|
} |
|
left_end++; |
|
right_end--; |
|
} |
|
} |
|
|
|
template<> |
|
void DistributedMergeSort::merge_algorithm<true>(size_t mid_offset) |
|
{ |
|
const auto& input = this->buffer; |
|
auto& output = this->local_array; |
|
|
|
size_t left_index = 0; |
|
size_t right_index = mid_offset; |
|
const size_t input_size = input.size(); |
|
for (int i = 0; i < output.size(); i++) |
|
{ |
|
const bool valid_left_index = left_index < mid_offset; |
|
const bool valid_right_index = right_index < input_size; |
|
if (valid_left_index && (!valid_right_index || input[left_index] < input[right_index])) |
|
{ |
|
output[i] = input[left_index++]; |
|
} |
|
else |
|
{ |
|
output[i] = input[right_index++]; |
|
} |
|
} |
|
} |
|
|
|
template<> |
|
void DistributedMergeSort::merge_algorithm<false>(size_t mid_offset) |
|
{ |
|
const auto& input = this->buffer; |
|
auto& output = this->local_array; |
|
|
|
size_t left_index = input.size() - 1; |
|
size_t right_index = mid_offset - 1; |
|
for (int i = output.size() - 1; i >= 0; i--) |
|
{ |
|
const bool valid_left_index = left_index >= mid_offset; |
|
const bool valid_right_index = right_index >= 0; |
|
if (valid_left_index && (!valid_right_index || input[left_index] >= input[right_index])) |
|
{ |
|
output[i] = input[left_index--]; |
|
} |
|
else |
|
{ |
|
output[i] = input[right_index--]; |
|
} |
|
} |
|
} |
|
|
|
void DistributedMergeSort::exchange_and_merge(int left_rank, int right_rank) |
|
{ |
|
const bool on_left_side = this->world_rank == left_rank; |
|
const int near_rank = on_left_side ? right_rank : left_rank; |
|
const size_t home_array_size = this->world_counts[this->world_rank]; |
|
const size_t near_array_size = this->world_counts[near_rank]; |
|
|
|
if (home_array_size == 0 || near_array_size == 0) |
|
{ |
|
return; |
|
} |
|
|
|
this->buffer.resize(home_array_size + near_array_size); |
|
std::copy(this->local_array.begin(), this->local_array.end(), this->buffer.begin()); |
|
|
|
MPI_Sendrecv( |
|
this->local_array.data(), home_array_size, MPI_INT, near_rank, 1, |
|
this->buffer.data() + home_array_size, near_array_size, MPI_INT, near_rank, 1, |
|
MPI_COMM_WORLD, nullptr); |
|
|
|
if (on_left_side) |
|
{ |
|
if (this->buffer[home_array_size] >= this->local_array.back()) |
|
{ |
|
return; |
|
} |
|
merge_algorithm<true>(home_array_size); |
|
} |
|
else |
|
{ |
|
if (this->local_array[0] >= this->buffer.back()) |
|
{ |
|
return; |
|
} |
|
merge_algorithm<false>(home_array_size); |
|
} |
|
} |