Skip to content

Instantly share code, notes, and snippets.

@roastduck
Created July 12, 2017 07:28
Show Gist options
  • Save roastduck/0ce9cb3178005f5fa5487741cc954188 to your computer and use it in GitHub Desktop.
Save roastduck/0ce9cb3178005f5fa5487741cc954188 to your computer and use it in GitHub Desktop.
Parallel odd-even sort implemented on MPI
#ifndef DEBUG
#define NDEBUG
#endif
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <algorithm>
#include <mpi.h>
#ifndef NDEBUG
#define assertSuccess(err) { if (err != MPI_SUCCESS) {\
char errStr[100]; int strLen;\
MPI_Error_string(err, errStr, &strLen);\
printf("Err 0x%X in line %d : %s\n", int(err), __LINE__, errStr); abort();\
}}
#else
#define assertSuccess(err)
#endif
#define CHKERR(func) {int _errCode = (func); assertSuccess(_errCode);}
void serialTrivial(const int n, const char *inName, const char *outName)
{
float *data = new float[n];
#ifndef FAKEINPUT
FILE *in = fopen(inName, "rb");
FILE *out = fopen(outName, "wb");
fread(data, sizeof(float), n, in);
#else
for (int i = 0; i < n; i++)
data[i] = rand();
#endif
std::sort(data, data + n);
#ifndef FAKEINPUT
fwrite(data, sizeof(float), n, out);
fclose(in);
fclose(out);
#endif
delete [] data;
}
class Worker
{
private:
const int n, processNum, rank, blockLen, prevLen, nextLen, IOOffset;
float *data, *swp, *bufL, *bufR;
bool orderedL, orderedR;
MPI_Request recvLReq, recvRReq, sendLReq, sendRReq;
MPI_File inFile, outFile;
public:
Worker(int _n, int _processNum, int _rank)
: n(_n), processNum(_processNum), rank(_rank),
blockLen(n / processNum + (rank < n % processNum)),
prevLen(n / processNum + (rank - 1 < n % processNum)),
nextLen(n / processNum + (rank + 1 < n % processNum)),
IOOffset(n / processNum * rank + std::min(rank, n % processNum)),
data(new float[blockLen]), swp(new float[blockLen]),
bufL(rank ? new float[prevLen] : NULL), // Can use `bufL` or `bufR` to determine whether at left or right end
bufR(rank < processNum - 1 ? new float[nextLen] : NULL),
orderedL(!bufL), orderedR(!bufR)
{
recvLReq = recvRReq = sendLReq = sendRReq = MPI_REQUEST_NULL;
}
~Worker()
{
delete[] data;
delete[] swp;
if (bufL) delete[] bufL;
if (bufR) delete[] bufR;
}
private:
void sendL()
{
if (!bufL) return;
#ifndef NDEBUG
printf("Process %d: call sendL\n", rank);
#endif
assert(sendLReq == MPI_REQUEST_NULL /* Immediately after sendL */);
CHKERR(MPI_Isend(data, blockLen, MPI_FLOAT, rank - 1, 0, MPI_COMM_WORLD, &sendLReq));
}
void sendR()
{
if (!bufR) return;
#ifndef NDEBUG
printf("Process %d: call sendR\n", rank);
#endif
assert(sendRReq == MPI_REQUEST_NULL /* Immediately after sendR */);
CHKERR(MPI_Isend(data, blockLen, MPI_FLOAT, rank + 1, 0, MPI_COMM_WORLD, &sendRReq));
}
void recvL()
{
if (!bufL) return;
#ifndef NDEBUG
printf("Process %d: call recvL\n", rank);
#endif
assert(recvLReq == MPI_REQUEST_NULL /* Immediately after mergeL */);
CHKERR(MPI_Irecv(bufL, prevLen, MPI_FLOAT, rank - 1, MPI_ANY_TAG, MPI_COMM_WORLD, &recvLReq));
}
void recvR()
{
if (!bufR) return;
#ifndef NDEBUG
printf("Process %d: call recvR\n", rank);
#endif
assert(recvRReq == MPI_REQUEST_NULL /* Immediately after mergeR */);
CHKERR(MPI_Irecv(bufR, nextLen, MPI_FLOAT, rank + 1, MPI_ANY_TAG, MPI_COMM_WORLD, &recvRReq));
}
void mergeL()
{
if (bufL)
{
#ifndef NDEBUG
printf("Process %d: call mergeL\n", rank);
#endif
CHKERR(MPI_Wait(&recvLReq, MPI_STATUS_IGNORE));
CHKERR(MPI_Wait(&sendLReq, MPI_STATUS_IGNORE)); // Or it will pollute the sending buffer
orderedL = (bufL[prevLen - 1] <= data[0]);
if (!orderedL)
{
for (int i = prevLen - 1, j = blockLen - 1, k = blockLen - 1; k >= 0; k--)
swp[k] = (j < 0 || (i >= 0 && bufL[i] > data[j])) ? bufL[i--] : data[j--];
std::swap(data, swp);
}
recvL(); // prepare for next mergeL
}
sendR(); // send out result
}
void mergeR()
{
if (bufR)
{
#ifndef NDEBUG
printf("Process %d: call mergeR\n", rank);
#endif
CHKERR(MPI_Wait(&recvRReq, MPI_STATUS_IGNORE));
CHKERR(MPI_Wait(&sendRReq, MPI_STATUS_IGNORE)); // Or it will pollute the sending buffer
orderedR = (data[blockLen - 1] <= bufR[0]);
if (!orderedR)
{
for (int i = 0, j = 0, k = 0; k < blockLen; k++)
swp[k] = (j == nextLen || (i < blockLen && data[i] < bufR[j])) ? data[i++] : bufR[j++];
std::swap(data, swp);
}
recvR(); // prepare for next mergeR
}
sendL(); // send out result
}
public:
void input(const char *inName)
{
#ifndef NDEBUG
printf("Process %d handles [%d, %d)\n", rank, IOOffset, IOOffset + blockLen);
#endif
#ifndef FAKEINPUT
CHKERR(MPI_File_open(MPI_COMM_WORLD, inName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inFile));
CHKERR(MPI_File_read_at_all(inFile, IOOffset * sizeof(float), (void*)data, blockLen, MPI_FLOAT, MPI_STATUS_IGNORE));
CHKERR(MPI_File_close(&inFile));
#else
srand(rank ^ (rank << 1));
for (int i = 0; i < blockLen; i++)
data[i] = rand();
#endif
std::sort(data, data + blockLen);
if (rank & 1)
sendL();
else
sendR();
recvL();
recvR();
}
void output(const char *outName)
{
#ifndef FAKEINPUT
CHKERR(MPI_File_open(MPI_COMM_WORLD, outName, MPI_MODE_WRONLY | MPI_MODE_CREATE, MPI_INFO_NULL, &outFile));
CHKERR(MPI_File_write_at_all(outFile, IOOffset * sizeof(float), (void*)data, blockLen, MPI_FLOAT, MPI_STATUS_IGNORE));
CHKERR(MPI_File_set_size(outFile, n * sizeof(float)));
CHKERR(MPI_File_close(&outFile));
#else
MPI_Barrier(MPI_COMM_WORLD); // Make sure sending/receiving avalible for un-done processes
#endif
#ifndef NDEBUG
printf("Process %d: output ended\n", rank);
#endif
}
void sort()
{
int done(false), processDone(false);
MPI_Request orderedReq = MPI_REQUEST_NULL;
while (true)
{
#ifndef NDEBUG
static int round = 0;
printf("Process %d: ====== Round %d ======\n", rank, round++);
#endif
CHKERR(MPI_Wait(&orderedReq, MPI_STATUS_IGNORE));
if (done) break;
processDone = (orderedL && orderedR);
CHKERR(MPI_Iallreduce(&processDone, &done, 1, MPI_INT, MPI_BAND, MPI_COMM_WORLD, &orderedReq));
// The same message is sent in the same round.
// Each messages pass for a round.
if (rank & 1)
{
mergeL();
mergeR();
} else
{
mergeR();
mergeL();
}
}
#ifndef NDEBUG
printf("Process %d done\n", rank);
#endif
}
};
int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
int processNum, rank;
CHKERR(MPI_Comm_size(MPI_COMM_WORLD, &processNum));
CHKERR(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
if (argc != 4)
{
if (!rank)
printf("Usage: ./main <number_count> <input_file> <output_file>\n");
MPI_Finalize();
return 1;
}
const int n = atoi(argv[1]);
const char *inName = argv[2];
const char *outName = argv[3];
if (processNum == 1 || n < processNum)
{
if (!rank)
serialTrivial(n, inName, outName);
MPI_Finalize();
return 0;
}
Worker *worker = new Worker(n, processNum, rank);
worker->input(inName);
worker->sort();
worker->output(outName);
delete worker;
#ifndef NDEBUG
printf("Process %d: finalize\n", rank);
#endif
MPI_Finalize();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment