Created
July 12, 2017 07:28
-
-
Save roastduck/0ce9cb3178005f5fa5487741cc954188 to your computer and use it in GitHub Desktop.
Parallel odd-even sort implemented on MPI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef DEBUG | |
#define NDEBUG | |
#endif | |
#include <cstdio> | |
#include <cstdlib> | |
#include <cassert> | |
#include <algorithm> | |
#include <mpi.h> | |
#ifndef NDEBUG | |
#define assertSuccess(err) { if (err != MPI_SUCCESS) {\ | |
char errStr[100]; int strLen;\ | |
MPI_Error_string(err, errStr, &strLen);\ | |
printf("Err 0x%X in line %d : %s\n", int(err), __LINE__, errStr); abort();\ | |
}} | |
#else | |
#define assertSuccess(err) | |
#endif | |
#define CHKERR(func) {int _errCode = (func); assertSuccess(_errCode);} | |
void serialTrivial(const int n, const char *inName, const char *outName) | |
{ | |
float *data = new float[n]; | |
#ifndef FAKEINPUT | |
FILE *in = fopen(inName, "rb"); | |
FILE *out = fopen(outName, "wb"); | |
fread(data, sizeof(float), n, in); | |
#else | |
for (int i = 0; i < n; i++) | |
data[i] = rand(); | |
#endif | |
std::sort(data, data + n); | |
#ifndef FAKEINPUT | |
fwrite(data, sizeof(float), n, out); | |
fclose(in); | |
fclose(out); | |
#endif | |
delete [] data; | |
} | |
class Worker | |
{ | |
private: | |
const int n, processNum, rank, blockLen, prevLen, nextLen, IOOffset; | |
float *data, *swp, *bufL, *bufR; | |
bool orderedL, orderedR; | |
MPI_Request recvLReq, recvRReq, sendLReq, sendRReq; | |
MPI_File inFile, outFile; | |
public: | |
Worker(int _n, int _processNum, int _rank) | |
: n(_n), processNum(_processNum), rank(_rank), | |
blockLen(n / processNum + (rank < n % processNum)), | |
prevLen(n / processNum + (rank - 1 < n % processNum)), | |
nextLen(n / processNum + (rank + 1 < n % processNum)), | |
IOOffset(n / processNum * rank + std::min(rank, n % processNum)), | |
data(new float[blockLen]), swp(new float[blockLen]), | |
bufL(rank ? new float[prevLen] : NULL), // Can use `bufL` or `bufR` to determine whether at left or right end | |
bufR(rank < processNum - 1 ? new float[nextLen] : NULL), | |
orderedL(!bufL), orderedR(!bufR) | |
{ | |
recvLReq = recvRReq = sendLReq = sendRReq = MPI_REQUEST_NULL; | |
} | |
~Worker() | |
{ | |
delete[] data; | |
delete[] swp; | |
if (bufL) delete[] bufL; | |
if (bufR) delete[] bufR; | |
} | |
private: | |
void sendL() | |
{ | |
if (!bufL) return; | |
#ifndef NDEBUG | |
printf("Process %d: call sendL\n", rank); | |
#endif | |
assert(sendLReq == MPI_REQUEST_NULL /* Immediately after sendL */); | |
CHKERR(MPI_Isend(data, blockLen, MPI_FLOAT, rank - 1, 0, MPI_COMM_WORLD, &sendLReq)); | |
} | |
void sendR() | |
{ | |
if (!bufR) return; | |
#ifndef NDEBUG | |
printf("Process %d: call sendR\n", rank); | |
#endif | |
assert(sendRReq == MPI_REQUEST_NULL /* Immediately after sendR */); | |
CHKERR(MPI_Isend(data, blockLen, MPI_FLOAT, rank + 1, 0, MPI_COMM_WORLD, &sendRReq)); | |
} | |
void recvL() | |
{ | |
if (!bufL) return; | |
#ifndef NDEBUG | |
printf("Process %d: call recvL\n", rank); | |
#endif | |
assert(recvLReq == MPI_REQUEST_NULL /* Immediately after mergeL */); | |
CHKERR(MPI_Irecv(bufL, prevLen, MPI_FLOAT, rank - 1, MPI_ANY_TAG, MPI_COMM_WORLD, &recvLReq)); | |
} | |
void recvR() | |
{ | |
if (!bufR) return; | |
#ifndef NDEBUG | |
printf("Process %d: call recvR\n", rank); | |
#endif | |
assert(recvRReq == MPI_REQUEST_NULL /* Immediately after mergeR */); | |
CHKERR(MPI_Irecv(bufR, nextLen, MPI_FLOAT, rank + 1, MPI_ANY_TAG, MPI_COMM_WORLD, &recvRReq)); | |
} | |
void mergeL() | |
{ | |
if (bufL) | |
{ | |
#ifndef NDEBUG | |
printf("Process %d: call mergeL\n", rank); | |
#endif | |
CHKERR(MPI_Wait(&recvLReq, MPI_STATUS_IGNORE)); | |
CHKERR(MPI_Wait(&sendLReq, MPI_STATUS_IGNORE)); // Or it will pollute the sending buffer | |
orderedL = (bufL[prevLen - 1] <= data[0]); | |
if (!orderedL) | |
{ | |
for (int i = prevLen - 1, j = blockLen - 1, k = blockLen - 1; k >= 0; k--) | |
swp[k] = (j < 0 || (i >= 0 && bufL[i] > data[j])) ? bufL[i--] : data[j--]; | |
std::swap(data, swp); | |
} | |
recvL(); // prepare for next mergeL | |
} | |
sendR(); // send out result | |
} | |
void mergeR() | |
{ | |
if (bufR) | |
{ | |
#ifndef NDEBUG | |
printf("Process %d: call mergeR\n", rank); | |
#endif | |
CHKERR(MPI_Wait(&recvRReq, MPI_STATUS_IGNORE)); | |
CHKERR(MPI_Wait(&sendRReq, MPI_STATUS_IGNORE)); // Or it will pollute the sending buffer | |
orderedR = (data[blockLen - 1] <= bufR[0]); | |
if (!orderedR) | |
{ | |
for (int i = 0, j = 0, k = 0; k < blockLen; k++) | |
swp[k] = (j == nextLen || (i < blockLen && data[i] < bufR[j])) ? data[i++] : bufR[j++]; | |
std::swap(data, swp); | |
} | |
recvR(); // prepare for next mergeR | |
} | |
sendL(); // send out result | |
} | |
public: | |
void input(const char *inName) | |
{ | |
#ifndef NDEBUG | |
printf("Process %d handles [%d, %d)\n", rank, IOOffset, IOOffset + blockLen); | |
#endif | |
#ifndef FAKEINPUT | |
CHKERR(MPI_File_open(MPI_COMM_WORLD, inName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inFile)); | |
CHKERR(MPI_File_read_at_all(inFile, IOOffset * sizeof(float), (void*)data, blockLen, MPI_FLOAT, MPI_STATUS_IGNORE)); | |
CHKERR(MPI_File_close(&inFile)); | |
#else | |
srand(rank ^ (rank << 1)); | |
for (int i = 0; i < blockLen; i++) | |
data[i] = rand(); | |
#endif | |
std::sort(data, data + blockLen); | |
if (rank & 1) | |
sendL(); | |
else | |
sendR(); | |
recvL(); | |
recvR(); | |
} | |
void output(const char *outName) | |
{ | |
#ifndef FAKEINPUT | |
CHKERR(MPI_File_open(MPI_COMM_WORLD, outName, MPI_MODE_WRONLY | MPI_MODE_CREATE, MPI_INFO_NULL, &outFile)); | |
CHKERR(MPI_File_write_at_all(outFile, IOOffset * sizeof(float), (void*)data, blockLen, MPI_FLOAT, MPI_STATUS_IGNORE)); | |
CHKERR(MPI_File_set_size(outFile, n * sizeof(float))); | |
CHKERR(MPI_File_close(&outFile)); | |
#else | |
MPI_Barrier(MPI_COMM_WORLD); // Make sure sending/receiving avalible for un-done processes | |
#endif | |
#ifndef NDEBUG | |
printf("Process %d: output ended\n", rank); | |
#endif | |
} | |
void sort() | |
{ | |
int done(false), processDone(false); | |
MPI_Request orderedReq = MPI_REQUEST_NULL; | |
while (true) | |
{ | |
#ifndef NDEBUG | |
static int round = 0; | |
printf("Process %d: ====== Round %d ======\n", rank, round++); | |
#endif | |
CHKERR(MPI_Wait(&orderedReq, MPI_STATUS_IGNORE)); | |
if (done) break; | |
processDone = (orderedL && orderedR); | |
CHKERR(MPI_Iallreduce(&processDone, &done, 1, MPI_INT, MPI_BAND, MPI_COMM_WORLD, &orderedReq)); | |
// The same message is sent in the same round. | |
// Each messages pass for a round. | |
if (rank & 1) | |
{ | |
mergeL(); | |
mergeR(); | |
} else | |
{ | |
mergeR(); | |
mergeL(); | |
} | |
} | |
#ifndef NDEBUG | |
printf("Process %d done\n", rank); | |
#endif | |
} | |
}; | |
int main(int argc, char **argv) | |
{ | |
MPI_Init(&argc, &argv); | |
int processNum, rank; | |
CHKERR(MPI_Comm_size(MPI_COMM_WORLD, &processNum)); | |
CHKERR(MPI_Comm_rank(MPI_COMM_WORLD, &rank)); | |
if (argc != 4) | |
{ | |
if (!rank) | |
printf("Usage: ./main <number_count> <input_file> <output_file>\n"); | |
MPI_Finalize(); | |
return 1; | |
} | |
const int n = atoi(argv[1]); | |
const char *inName = argv[2]; | |
const char *outName = argv[3]; | |
if (processNum == 1 || n < processNum) | |
{ | |
if (!rank) | |
serialTrivial(n, inName, outName); | |
MPI_Finalize(); | |
return 0; | |
} | |
Worker *worker = new Worker(n, processNum, rank); | |
worker->input(inName); | |
worker->sort(); | |
worker->output(outName); | |
delete worker; | |
#ifndef NDEBUG | |
printf("Process %d: finalize\n", rank); | |
#endif | |
MPI_Finalize(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment