Created
April 11, 2015 23:17
-
-
Save jtgi/1f2287dd0af42ff7c9f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <stdarg.h> | |
#include <arpa/inet.h> | |
#include "mpi.h" | |
#include "fgmpi.h" | |
/* Conf */ | |
#define INFO_LOG_LVL 1 | |
#define CLOCK_ID 0 | |
#define NUM_STATES 4 | |
/* Messages */ | |
#define TICK 19 | |
#define ELECTION_ID 20 | |
#define AYA_ID 21 | |
#define IAA_ID 22 | |
#define COORD_ID 23 | |
#define ANSWER_ID 24 | |
#define IAM_DEAD_JIM 998 | |
#define TERMINATE 999 | |
#define UNKNOWN -1 | |
/* Events */ | |
#define ELECTION 1000 | |
#define DEAD 1001 | |
#define ALIVE 1002 | |
#define LEADER 1003 | |
#define LEADERDEAD 1004 | |
/* Macros */ | |
#define max( a, b ) ( ( a > b) ? a : b ) | |
/* From the file mpi.h this is the definition for MPI_status, with my comments on the | |
fields | |
typedef struct MPI_Status { | |
int count; // Seems to be the count, in bytes of the data received | |
int cancelled; // Indication of whether of not the operation was cancelled. | |
// this probably only applies if you are doing non-blocking | |
// operations | |
int MPI_SOURCE; // ID of the source | |
int MPI_TAG; // The actual message tag | |
int MPI_ERROR; / The error code if there was one | |
} MPI_Status; | |
*/ | |
enum node_states { | |
ST_SLAVE, | |
ST_CAND, | |
ST_WAIT, | |
ST_COORD | |
}; | |
struct node_state; //fwd decl | |
// State interface implemented by all states | |
struct node_handlers { | |
void (*on_enter)(struct node_state *n); | |
void (*on_elect)(struct node_state *n, int); | |
void (*on_coord)(struct node_state *n, int); | |
void (*on_answer)(struct node_state *n, int); | |
void (*on_aya)(struct node_state *n, int); | |
void (*on_iaa)(struct node_state *n, int); | |
void (*on_kill)(struct node_state *n); | |
void (*on_tick)(struct node_state *n); | |
void (*on_exit)(struct node_state *n); | |
void (*dead_tick)(struct node_state *n); | |
int (*get_timeout)(struct node_state *n); | |
}; | |
struct node_state { | |
struct node_handlers handlers[NUM_STATES]; | |
struct node_handlers *st; | |
int id; | |
long dlc; | |
int num_nodes; | |
int co_size; | |
int s_rank; | |
int log_level; | |
int timeout_val; | |
int aya_time; | |
int aya_time_rem; | |
int send_failure_rate; | |
int return_to_life; | |
int dead_time_remaining; | |
int coord_id; | |
int tick; | |
int state_time_remaining; | |
}; | |
int bully( int argc, char** argv ); | |
/* | |
* Binds bully function for MPI | |
*/ | |
FG_ProcessPtr_t binding_func(int argc __attribute__ ((unused)), | |
char** argv __attribute__ ((unused)), | |
int rank __attribute__ ((unused))){ | |
return (&bully); | |
} | |
FG_MapPtr_t map_lookup(int argc __attribute__ ((unused)), | |
char** argv __attribute__ ((unused)), | |
char* str __attribute__ ((unused))){ | |
return (&binding_func); | |
} | |
int main( int argc, char *argv[]) { | |
FGmpiexec(&argc, &argv, &map_lookup); | |
return (0); | |
} | |
void usage() { | |
printf("Usage mpirun -nfg X -n Y ./bully [log level] [timeout] [aya time] [send failure rate] [return to life]\n"); | |
exit(1); | |
} | |
void get_evt(int evt, char str[]) { | |
switch(evt) { | |
case ELECTION: | |
strcpy(str, "ELECTION"); | |
break; | |
case DEAD: | |
strcpy(str, "DEAD"); | |
break; | |
case ALIVE: | |
strcpy(str, "ALIVE"); | |
break; | |
case LEADER: | |
strcpy(str, "LEADER"); | |
break; | |
case LEADERDEAD: | |
strcpy(str, "LEADERDEAD"); | |
break; | |
default: | |
strcpy(str, "PRIV_CUSTOM"); | |
} | |
} | |
/* | |
* Actually does the logging to the screen | |
*/ | |
void log_evt_internal(struct node_state *n, int evt, const char *fmt, va_list args) { | |
char evt_str[16]; | |
get_evt(evt, evt_str); | |
printf("DLC: %lu, Event: %s, Node %d", n->dlc, evt_str, n->id); | |
printf(", "); | |
vprintf(fmt, args); | |
printf("\n"); | |
} | |
/* | |
* Writes to log for must print events | |
*/ | |
void log_evt(struct node_state *n, int evt, const char *fmt, ...) { | |
va_list args; | |
va_start(args, fmt); | |
log_evt_internal(n, evt, fmt, args); | |
va_end(args); | |
} | |
/* | |
* Writes to log for non-critical events (log level 1 or higher) | |
*/ | |
void info(struct node_state *n, int evt, const char *fmt, ...) { | |
if(n->log_level >= INFO_LOG_LVL) { | |
va_list args; | |
va_start(args, fmt); | |
log_evt_internal(n, evt, fmt, args); | |
va_end(args); | |
} | |
} | |
/* | |
* Helper to ensure exit/enter handlers are invoked | |
*/ | |
void change_state(struct node_state *n, enum node_states state) { | |
n->st->on_exit(n); | |
n->st = &(n->handlers[state]); | |
n->st->on_enter(n); | |
} | |
/* | |
* Targeted message for 1 recipient | |
*/ | |
void send_to(struct node_state *n, int msg, int dst) { | |
int flag = 0; | |
n->dlc++; | |
info(n, msg, "sending to node: %d", dst); | |
MPI_Request req = MPI_REQUEST_NULL; | |
MPI_Isend(&n->dlc, 1, MPI_LONG, dst, msg, MPI_COMM_WORLD, &req); | |
MPI_Test(&req, &flag, MPI_STATUS_IGNORE); | |
} | |
/* | |
* Send to all nodes higher ranked than the current node. | |
*/ | |
void send_up(struct node_state *n, int msg) { | |
int flag = 0; | |
n->dlc++; | |
info(n, msg, "sending up"); | |
int i, j; | |
MPI_Request reqs[n->num_nodes - n->id]; | |
for(j = 0, i = n->num_nodes - 1; i > n->id; i--, j++) { | |
MPI_Isend(&n->dlc, 1, MPI_LONG, i, msg, MPI_COMM_WORLD, &(reqs[j])); | |
} | |
// Dealloc's requests | |
MPI_Testall(j, reqs, &flag, MPI_STATUSES_IGNORE); | |
} | |
/* | |
* Send to all nodes lower ranked than the current node | |
*/ | |
void send_down(struct node_state *n, int msg) { | |
int out = 0; | |
n->dlc++; | |
info(n, msg, "sending down"); | |
int i, j; | |
MPI_Request reqs[n->id - 1]; | |
for(i = 1, j = 0; i < n->id; i++, j++) { | |
MPI_Isend(&n->dlc, 1, MPI_LONG, i, msg, MPI_COMM_WORLD, &(reqs[j])); | |
} | |
MPI_Testall(j, reqs, &out, MPI_STATUSES_IGNORE); | |
} | |
void send_election(struct node_state *n) { | |
log_evt(n, ELECTION, "Starting Election"); | |
send_up(n, ELECTION_ID); | |
} | |
void send_coord(struct node_state *n) { | |
log_evt(n, LEADER, "I am the new coordinator."); | |
send_down(n, COORD_ID); | |
} | |
void send_answer(struct node_state *n) { | |
info(n, -1, "answer"); | |
send_down(n, ANSWER_ID); | |
} | |
void send_aya(struct node_state *n) { | |
if(n->coord_id == -1) | |
return; | |
send_to(n, AYA_ID, n->coord_id); | |
} | |
void send_iaa(struct node_state *n, int target) { | |
info(n, -1, "send iaa "); | |
send_to(n, IAA_ID, target); | |
} | |
void generic_on_kill(struct node_state *n) { | |
info(n, DEAD, "Regular Node Dying..."); | |
n->dead_time_remaining = n->return_to_life; | |
} | |
void generic_dead_tick(struct node_state *n) { | |
n->dead_time_remaining--; | |
if(n->dead_time_remaining == 0) { | |
log_evt(n, ALIVE, "I am back from the dead..."); | |
} | |
} | |
/* | |
* State Machine implementation. | |
* We model the bully algorithm with 4 states | |
* Slave - You have a coordinator | |
* Candidate - You were unable to contact a coord and are awaiting election results | |
* Wait - You received a response and are awaiting confirmation. | |
* Coord - You are the coordinator | |
* | |
* Each of these states implement a handler for all possible messages that can be | |
* received. | |
*/ | |
/* Candidate Handlers */ | |
int cand_get_timeout(struct node_state *n) { | |
return n->timeout_val; | |
} | |
void cand_on_enter(struct node_state *n) { | |
log_evt(n, LEADERDEAD, "Coordinator not responding..."); | |
send_election(n); | |
n->state_time_remaining = n->timeout_val; | |
} | |
void cand_on_elect(struct node_state *n, int from_id) { | |
info(n, -1, "cand_on_elect \n"); | |
send_to(n, ANSWER_ID, from_id); | |
} | |
void cand_on_coord(struct node_state *n, int from_id) { | |
info(n, -1, "cand_on_coord \n"); | |
n->coord_id = from_id; | |
change_state(n, ST_SLAVE); | |
} | |
void cand_on_answer(struct node_state *n, int from_id) { | |
info(n, -1, "cand_on_answer \n"); | |
change_state(n, ST_WAIT); | |
} | |
void cand_on_aya(struct node_state *n, int from_id) { | |
info(n, -1, "cand_on_aya \n"); | |
} | |
void cand_on_iaa(struct node_state *n, int from_id) { | |
info(n, -1, "cand_on_iaa \n"); | |
} | |
void cand_on_tick(struct node_state *n) { | |
info(n, -1, "cand_on_tick \n"); | |
n->state_time_remaining -= 1; | |
if(n->state_time_remaining <= 0) { | |
change_state(n, ST_COORD); | |
} | |
} | |
void cand_on_exit(struct node_state *n) { | |
info(n, -1, "cand_on_exit \n"); | |
n->state_time_remaining = n->timeout_val; | |
} | |
/* Coord Handlers */ | |
int coord_get_timeout(struct node_state *n) { | |
info(n, -1, "coord get_timeout \n"); | |
return n->timeout_val; | |
} | |
void coord_on_kill(struct node_state *n) { | |
log_evt(n, DEAD, "Coordinator Dying..."); | |
n->dead_time_remaining = n->return_to_life; | |
} | |
void coord_on_enter(struct node_state *n) { | |
info(n, -1, "coord_on_enter \n"); | |
send_coord(n); | |
} | |
void coord_on_elect(struct node_state *n, int from_id) { | |
info(n, -1, "coord_on_elect \n"); | |
if(from_id > n->id) { | |
n->coord_id = from_id; | |
change_state(n, ST_SLAVE); | |
} | |
send_coord(n); | |
} | |
void coord_on_coord(struct node_state *n, int from_id) { | |
info(n, -1, "coord_on_coord \n"); | |
if(from_id > n->id) { | |
n->coord_id = from_id; | |
change_state(n, ST_SLAVE); | |
} | |
} | |
void coord_on_answer(struct node_state *n, int from_id) { | |
info(n, -1, "coord_on_answer \n"); | |
n->coord_id = from_id; | |
change_state(n, ST_SLAVE); | |
} | |
void coord_on_aya(struct node_state *n, int from_id) { | |
info(n, -1, "coord_on_aya \n"); | |
int r = (rand() % 100); | |
if(r < n->send_failure_rate) { | |
coord_on_kill(n); | |
return; | |
} | |
send_iaa(n, from_id); | |
} | |
void coord_on_iaa(struct node_state *n, int from_id) { | |
info(n, -1, "coord_on_iaa \n"); | |
} | |
void coord_on_tick(struct node_state *n) { | |
info(n, -1, "coord on_tick \n"); | |
} | |
void coord_on_exit(struct node_state *n) { | |
info(n, -1, "coord_cleanup \n"); | |
} | |
/* Slave Handlers */ | |
int slave_get_timeout(struct node_state *n) { | |
info(n, -1, "slave_get_timeout \n"); | |
return n->aya_time; | |
} | |
void slave_on_enter(struct node_state *n) { | |
info(n, -1, "slave_on_enter \n"); | |
n->state_time_remaining = n->timeout_val; | |
n->aya_time_rem = n->aya_time; | |
} | |
void slave_on_elect(struct node_state *n, int from_id) { | |
info(n, -1, "slave_on_elect \n"); | |
send_to(n, ANSWER_ID, from_id); | |
} | |
void slave_on_coord(struct node_state *n, int from_id) { | |
info(n, -1, "slave_on_coord \n"); | |
n->state_time_remaining = n->timeout_val; | |
n->coord_id = from_id; | |
} | |
void slave_on_answer(struct node_state *n, int from_id) { | |
info(n, -1, "slave_on_answer \n"); | |
} | |
void slave_on_aya(struct node_state *n, int from_id) { | |
info(n, -1, "slave_on_aya \n"); | |
} | |
void slave_on_iaa(struct node_state *n, int from_id) { | |
info(n, -1, "slave_on_iaa \n"); | |
n->state_time_remaining = n->timeout_val; | |
n->aya_time_rem = n->aya_time; | |
} | |
/* | |
* Send AYAs unless `state_time_remaining` | |
* has elapsed | |
*/ | |
void slave_on_tick(struct node_state *n) { | |
n->state_time_remaining -= 1; | |
n->aya_time_rem -= 1; | |
info(n, -1, "tick() aya_rem: %d, state_time_rem: %d", n->aya_time_rem, n->state_time_remaining); | |
if(n->state_time_remaining <= 0) { | |
change_state(n, ST_CAND); | |
return; | |
} | |
if(n->aya_time_rem <= 0) { | |
send_aya(n); | |
n->aya_time_rem = n->aya_time; | |
} | |
} | |
void slave_on_exit(struct node_state *n) { | |
info(n, -1, "slave_on_exit \n"); | |
n->state_time_remaining = n->timeout_val; | |
} | |
/* Wait Handlers */ | |
int wait_get_timeout(struct node_state *n) { | |
info(n, -1, "wait_get_timeout \n"); | |
return n->timeout_val * n->num_nodes; | |
} | |
void wait_on_enter(struct node_state *n) { | |
info(n, -1, "wait on_enter \n"); | |
n->state_time_remaining = n->timeout_val * n->num_nodes; | |
} | |
void wait_on_elect(struct node_state *n, int from_id) { | |
info(n, -1, "wait on_elect \n"); | |
} | |
void wait_on_coord(struct node_state *n, int from_id) { | |
info(n, -1, "wait on_coord \n"); | |
n->coord_id = from_id; | |
change_state(n, ST_SLAVE); | |
} | |
void wait_on_answer(struct node_state *n, int from_id) { | |
info(n, -1, "wait on_answer \n"); | |
} | |
void wait_on_aya(struct node_state *n, int from_id) { | |
info(n, -1, "wait on_aya \n"); | |
} | |
void wait_on_iaa(struct node_state *n, int from_id) { | |
info(n, -1, "wait on_iaa \n"); | |
} | |
void wait_on_tick(struct node_state *n) { | |
info(n, -1, "wait on_tick \n"); | |
n->state_time_remaining -= 1; | |
if(n->state_time_remaining <= 0) { | |
change_state(n, ST_CAND); | |
} | |
} | |
void wait_on_exit(struct node_state *n) { | |
info(n, -1, "wait cleanup \n"); | |
n->state_time_remaining = n->timeout_val; | |
} | |
/* | |
* The special clock process loops forever sending a tick | |
* to each process at the least every second. | |
*/ | |
void start_clock(struct node_state *n) { | |
double curr_time, end_time; | |
for(;;) { | |
int i; | |
for (i = 1; i < n->num_nodes; i++) { | |
int buff_size = 1; | |
int buffer[buff_size]; | |
buffer[0] = TICK; | |
MPI_Send(&buffer, buff_size, MPI_INT, i, TICK, MPI_COMM_WORLD); | |
} | |
// Busy wait | |
curr_time = MPI_Wtime(); | |
end_time = curr_time + 1; | |
while(MPI_Wtime() < end_time); | |
} | |
MPI_Finalize(); | |
} | |
/* | |
* Glue code to bind all state handlers. | |
* This is basically implementing polymorphism in C. | |
* Inspired by CPSC 415 device drivers | |
*/ | |
void bind_states(struct node_state *n) { | |
n->handlers[ST_SLAVE].on_enter = slave_on_enter; | |
n->handlers[ST_SLAVE].on_elect = slave_on_elect; | |
n->handlers[ST_SLAVE].on_coord = slave_on_coord; | |
n->handlers[ST_SLAVE].on_answer = slave_on_answer; | |
n->handlers[ST_SLAVE].on_aya = slave_on_aya; | |
n->handlers[ST_SLAVE].on_iaa = slave_on_iaa; | |
n->handlers[ST_SLAVE].on_tick = slave_on_tick; | |
n->handlers[ST_SLAVE].on_exit = slave_on_exit; | |
n->handlers[ST_SLAVE].get_timeout = slave_get_timeout; | |
n->handlers[ST_SLAVE].on_kill = generic_on_kill; | |
n->handlers[ST_SLAVE].dead_tick = generic_dead_tick; | |
n->handlers[ST_CAND].on_enter = cand_on_enter; | |
n->handlers[ST_CAND].on_elect = cand_on_elect; | |
n->handlers[ST_CAND].on_coord = cand_on_coord; | |
n->handlers[ST_CAND].on_answer = cand_on_answer; | |
n->handlers[ST_CAND].on_aya = cand_on_aya; | |
n->handlers[ST_CAND].on_iaa = cand_on_iaa; | |
n->handlers[ST_CAND].on_tick = cand_on_tick; | |
n->handlers[ST_CAND].on_exit = cand_on_exit; | |
n->handlers[ST_CAND].get_timeout = cand_get_timeout; | |
n->handlers[ST_CAND].on_kill = generic_on_kill; | |
n->handlers[ST_CAND].dead_tick = generic_dead_tick; | |
n->handlers[ST_COORD].on_enter = coord_on_enter; | |
n->handlers[ST_COORD].on_elect = coord_on_elect; | |
n->handlers[ST_COORD].on_coord = coord_on_coord; | |
n->handlers[ST_COORD].on_answer = coord_on_answer; | |
n->handlers[ST_COORD].on_aya = coord_on_aya; | |
n->handlers[ST_COORD].on_iaa = coord_on_iaa; | |
n->handlers[ST_COORD].on_tick = coord_on_tick; | |
n->handlers[ST_COORD].on_exit = coord_on_exit; | |
n->handlers[ST_COORD].get_timeout = coord_get_timeout; | |
n->handlers[ST_COORD].on_kill = coord_on_kill; | |
n->handlers[ST_COORD].dead_tick = generic_dead_tick; | |
n->handlers[ST_WAIT].on_enter = wait_on_enter; | |
n->handlers[ST_WAIT].on_elect = wait_on_elect; | |
n->handlers[ST_WAIT].on_coord = wait_on_coord; | |
n->handlers[ST_WAIT].on_answer = wait_on_answer; | |
n->handlers[ST_WAIT].on_aya = wait_on_aya; | |
n->handlers[ST_WAIT].on_iaa = wait_on_iaa; | |
n->handlers[ST_WAIT].on_tick = wait_on_tick; | |
n->handlers[ST_WAIT].on_exit = wait_on_exit; | |
n->handlers[ST_WAIT].get_timeout = wait_get_timeout; | |
n->handlers[ST_WAIT].on_kill = generic_on_kill; | |
n->handlers[ST_WAIT].dead_tick = generic_dead_tick; | |
} | |
int bully( int argc, char** argv ) { | |
struct node_state *n = malloc(sizeof(struct node_state)); | |
memset(n, 0, sizeof(struct node_state)); | |
if(argc < 6) { | |
printf("Not enough arguments\n"); | |
usage(); | |
} | |
n->log_level = atoi(argv[1]); | |
n->timeout_val = atoi(argv[2]); | |
n->aya_time = atoi(argv[3]); | |
n->send_failure_rate = atoi(argv[4]); | |
n->return_to_life = atoi(argv[5]); | |
n->dlc = 1; | |
n->coord_id = -1; | |
MPI_Init(&argc, &argv); | |
MPI_Comm_size(MPI_COMM_WORLD, &(n->num_nodes)); | |
MPI_Comm_rank(MPI_COMM_WORLD, &(n->id)); | |
MPIX_Get_collocated_size(&(n->co_size)); | |
MPIX_Get_collocated_startrank(&(n->s_rank)); | |
printf("Hello! I am rank %d of %d co-located size is %d, start rank %d \n", n->id, n->num_nodes, n->co_size, n->s_rank); | |
// Clock Node | |
if(n->id == CLOCK_ID) { | |
start_clock(n); | |
return(0); | |
} | |
// Bind state machine | |
bind_states(n); | |
if(n->id == n->num_nodes - 1) { | |
n->st = &(n->handlers[ST_COORD]); | |
} else { | |
n->st = &(n->handlers[ST_SLAVE]); | |
n->st->on_enter(n); | |
} | |
/* | |
* Because of our state machine implementation | |
* we can just call generic methods based on the message | |
* received. The current state will determine which | |
* handler is actually invoked. | |
*/ | |
for(;;) { | |
MPI_Status status; | |
long rcv_dlc; | |
MPI_Recv(&rcv_dlc, 1, MPI_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); | |
if(status.MPI_SOURCE == CLOCK_ID) { | |
/* | |
* Keep track of how long we are dead for | |
* if we've been killed | |
*/ | |
if(n->dead_time_remaining > 0) { | |
n->st->dead_tick(n); | |
} else { | |
n->st->on_tick(n); | |
} | |
continue; | |
} | |
if(n->dead_time_remaining > 0) { | |
continue; | |
} | |
n->dlc++; | |
n->dlc = n->dlc >= rcv_dlc ? n->dlc : rcv_dlc; | |
info(n, status.MPI_TAG, "Received cmd from %d", status.MPI_SOURCE); | |
switch(status.MPI_TAG) { | |
case ELECTION_ID: | |
n->st->on_elect(n, status.MPI_SOURCE); | |
break; | |
case AYA_ID: | |
n->st->on_aya(n, status.MPI_SOURCE); | |
break; | |
case IAA_ID: | |
n->st->on_iaa(n, status.MPI_SOURCE); | |
break; | |
case COORD_ID: | |
n->st->on_coord(n, status.MPI_SOURCE); | |
break; | |
case ANSWER_ID: | |
n->st->on_answer(n, status.MPI_SOURCE); | |
break; | |
case TERMINATE: | |
n->st->on_kill(n); | |
break; | |
default: | |
info(n, -1, "Unknown Command. Ignoring..."); | |
} | |
} | |
MPI_Finalize(); | |
return(0); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment