Skip to content

Instantly share code, notes, and snippets.

@jtgi
Created April 11, 2015 23:17
Show Gist options
  • Save jtgi/1f2287dd0af42ff7c9f0 to your computer and use it in GitHub Desktop.
Save jtgi/1f2287dd0af42ff7c9f0 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdarg.h>
#include <arpa/inet.h>
#include "mpi.h"
#include "fgmpi.h"
/* Conf */
#define INFO_LOG_LVL 1
#define CLOCK_ID 0
#define NUM_STATES 4
/* Messages */
#define TICK 19
#define ELECTION_ID 20
#define AYA_ID 21
#define IAA_ID 22
#define COORD_ID 23
#define ANSWER_ID 24
#define IAM_DEAD_JIM 998
#define TERMINATE 999
#define UNKNOWN -1
/* Events */
#define ELECTION 1000
#define DEAD 1001
#define ALIVE 1002
#define LEADER 1003
#define LEADERDEAD 1004
/* Macros */
#define max( a, b ) ( ( a > b) ? a : b )
/* From the file mpi.h this is the definition for MPI_status, with my comments on the
fields
typedef struct MPI_Status {
int count; // Seems to be the count, in bytes of the data received
int cancelled; // Indication of whether of not the operation was cancelled.
// this probably only applies if you are doing non-blocking
// operations
int MPI_SOURCE; // ID of the source
int MPI_TAG; // The actual message tag
int MPI_ERROR; / The error code if there was one
} MPI_Status;
*/
enum node_states {
ST_SLAVE,
ST_CAND,
ST_WAIT,
ST_COORD
};
struct node_state; //fwd decl
// State interface implemented by all states
struct node_handlers {
void (*on_enter)(struct node_state *n);
void (*on_elect)(struct node_state *n, int);
void (*on_coord)(struct node_state *n, int);
void (*on_answer)(struct node_state *n, int);
void (*on_aya)(struct node_state *n, int);
void (*on_iaa)(struct node_state *n, int);
void (*on_kill)(struct node_state *n);
void (*on_tick)(struct node_state *n);
void (*on_exit)(struct node_state *n);
void (*dead_tick)(struct node_state *n);
int (*get_timeout)(struct node_state *n);
};
struct node_state {
struct node_handlers handlers[NUM_STATES];
struct node_handlers *st;
int id;
long dlc;
int num_nodes;
int co_size;
int s_rank;
int log_level;
int timeout_val;
int aya_time;
int aya_time_rem;
int send_failure_rate;
int return_to_life;
int dead_time_remaining;
int coord_id;
int tick;
int state_time_remaining;
};
int bully( int argc, char** argv );
/*
* Binds bully function for MPI
*/
FG_ProcessPtr_t binding_func(int argc __attribute__ ((unused)),
char** argv __attribute__ ((unused)),
int rank __attribute__ ((unused))){
return (&bully);
}
FG_MapPtr_t map_lookup(int argc __attribute__ ((unused)),
char** argv __attribute__ ((unused)),
char* str __attribute__ ((unused))){
return (&binding_func);
}
int main( int argc, char *argv[]) {
FGmpiexec(&argc, &argv, &map_lookup);
return (0);
}
void usage() {
printf("Usage mpirun -nfg X -n Y ./bully [log level] [timeout] [aya time] [send failure rate] [return to life]\n");
exit(1);
}
void get_evt(int evt, char str[]) {
switch(evt) {
case ELECTION:
strcpy(str, "ELECTION");
break;
case DEAD:
strcpy(str, "DEAD");
break;
case ALIVE:
strcpy(str, "ALIVE");
break;
case LEADER:
strcpy(str, "LEADER");
break;
case LEADERDEAD:
strcpy(str, "LEADERDEAD");
break;
default:
strcpy(str, "PRIV_CUSTOM");
}
}
/*
* Actually does the logging to the screen
*/
void log_evt_internal(struct node_state *n, int evt, const char *fmt, va_list args) {
char evt_str[16];
get_evt(evt, evt_str);
printf("DLC: %lu, Event: %s, Node %d", n->dlc, evt_str, n->id);
printf(", ");
vprintf(fmt, args);
printf("\n");
}
/*
* Writes to log for must print events
*/
void log_evt(struct node_state *n, int evt, const char *fmt, ...) {
va_list args;
va_start(args, fmt);
log_evt_internal(n, evt, fmt, args);
va_end(args);
}
/*
* Writes to log for non-critical events (log level 1 or higher)
*/
void info(struct node_state *n, int evt, const char *fmt, ...) {
if(n->log_level >= INFO_LOG_LVL) {
va_list args;
va_start(args, fmt);
log_evt_internal(n, evt, fmt, args);
va_end(args);
}
}
/*
* Helper to ensure exit/enter handlers are invoked
*/
void change_state(struct node_state *n, enum node_states state) {
n->st->on_exit(n);
n->st = &(n->handlers[state]);
n->st->on_enter(n);
}
/*
* Targeted message for 1 recipient
*/
void send_to(struct node_state *n, int msg, int dst) {
int flag = 0;
n->dlc++;
info(n, msg, "sending to node: %d", dst);
MPI_Request req = MPI_REQUEST_NULL;
MPI_Isend(&n->dlc, 1, MPI_LONG, dst, msg, MPI_COMM_WORLD, &req);
MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
}
/*
* Send to all nodes higher ranked than the current node.
*/
void send_up(struct node_state *n, int msg) {
int flag = 0;
n->dlc++;
info(n, msg, "sending up");
int i, j;
MPI_Request reqs[n->num_nodes - n->id];
for(j = 0, i = n->num_nodes - 1; i > n->id; i--, j++) {
MPI_Isend(&n->dlc, 1, MPI_LONG, i, msg, MPI_COMM_WORLD, &(reqs[j]));
}
// Dealloc's requests
MPI_Testall(j, reqs, &flag, MPI_STATUSES_IGNORE);
}
/*
* Send to all nodes lower ranked than the current node
*/
void send_down(struct node_state *n, int msg) {
int out = 0;
n->dlc++;
info(n, msg, "sending down");
int i, j;
MPI_Request reqs[n->id - 1];
for(i = 1, j = 0; i < n->id; i++, j++) {
MPI_Isend(&n->dlc, 1, MPI_LONG, i, msg, MPI_COMM_WORLD, &(reqs[j]));
}
MPI_Testall(j, reqs, &out, MPI_STATUSES_IGNORE);
}
void send_election(struct node_state *n) {
log_evt(n, ELECTION, "Starting Election");
send_up(n, ELECTION_ID);
}
void send_coord(struct node_state *n) {
log_evt(n, LEADER, "I am the new coordinator.");
send_down(n, COORD_ID);
}
void send_answer(struct node_state *n) {
info(n, -1, "answer");
send_down(n, ANSWER_ID);
}
void send_aya(struct node_state *n) {
if(n->coord_id == -1)
return;
send_to(n, AYA_ID, n->coord_id);
}
void send_iaa(struct node_state *n, int target) {
info(n, -1, "send iaa ");
send_to(n, IAA_ID, target);
}
void generic_on_kill(struct node_state *n) {
info(n, DEAD, "Regular Node Dying...");
n->dead_time_remaining = n->return_to_life;
}
void generic_dead_tick(struct node_state *n) {
n->dead_time_remaining--;
if(n->dead_time_remaining == 0) {
log_evt(n, ALIVE, "I am back from the dead...");
}
}
/*
* State Machine implementation.
* We model the bully algorithm with 4 states
* Slave - You have a coordinator
* Candidate - You were unable to contact a coord and are awaiting election results
* Wait - You received a response and are awaiting confirmation.
* Coord - You are the coordinator
*
* Each of these states implement a handler for all possible messages that can be
* received.
*/
/* Candidate Handlers */
int cand_get_timeout(struct node_state *n) {
return n->timeout_val;
}
void cand_on_enter(struct node_state *n) {
log_evt(n, LEADERDEAD, "Coordinator not responding...");
send_election(n);
n->state_time_remaining = n->timeout_val;
}
void cand_on_elect(struct node_state *n, int from_id) {
info(n, -1, "cand_on_elect \n");
send_to(n, ANSWER_ID, from_id);
}
void cand_on_coord(struct node_state *n, int from_id) {
info(n, -1, "cand_on_coord \n");
n->coord_id = from_id;
change_state(n, ST_SLAVE);
}
void cand_on_answer(struct node_state *n, int from_id) {
info(n, -1, "cand_on_answer \n");
change_state(n, ST_WAIT);
}
void cand_on_aya(struct node_state *n, int from_id) {
info(n, -1, "cand_on_aya \n");
}
void cand_on_iaa(struct node_state *n, int from_id) {
info(n, -1, "cand_on_iaa \n");
}
void cand_on_tick(struct node_state *n) {
info(n, -1, "cand_on_tick \n");
n->state_time_remaining -= 1;
if(n->state_time_remaining <= 0) {
change_state(n, ST_COORD);
}
}
void cand_on_exit(struct node_state *n) {
info(n, -1, "cand_on_exit \n");
n->state_time_remaining = n->timeout_val;
}
/* Coord Handlers */
int coord_get_timeout(struct node_state *n) {
info(n, -1, "coord get_timeout \n");
return n->timeout_val;
}
void coord_on_kill(struct node_state *n) {
log_evt(n, DEAD, "Coordinator Dying...");
n->dead_time_remaining = n->return_to_life;
}
void coord_on_enter(struct node_state *n) {
info(n, -1, "coord_on_enter \n");
send_coord(n);
}
void coord_on_elect(struct node_state *n, int from_id) {
info(n, -1, "coord_on_elect \n");
if(from_id > n->id) {
n->coord_id = from_id;
change_state(n, ST_SLAVE);
}
send_coord(n);
}
void coord_on_coord(struct node_state *n, int from_id) {
info(n, -1, "coord_on_coord \n");
if(from_id > n->id) {
n->coord_id = from_id;
change_state(n, ST_SLAVE);
}
}
void coord_on_answer(struct node_state *n, int from_id) {
info(n, -1, "coord_on_answer \n");
n->coord_id = from_id;
change_state(n, ST_SLAVE);
}
void coord_on_aya(struct node_state *n, int from_id) {
info(n, -1, "coord_on_aya \n");
int r = (rand() % 100);
if(r < n->send_failure_rate) {
coord_on_kill(n);
return;
}
send_iaa(n, from_id);
}
void coord_on_iaa(struct node_state *n, int from_id) {
info(n, -1, "coord_on_iaa \n");
}
void coord_on_tick(struct node_state *n) {
info(n, -1, "coord on_tick \n");
}
void coord_on_exit(struct node_state *n) {
info(n, -1, "coord_cleanup \n");
}
/* Slave Handlers */
int slave_get_timeout(struct node_state *n) {
info(n, -1, "slave_get_timeout \n");
return n->aya_time;
}
void slave_on_enter(struct node_state *n) {
info(n, -1, "slave_on_enter \n");
n->state_time_remaining = n->timeout_val;
n->aya_time_rem = n->aya_time;
}
void slave_on_elect(struct node_state *n, int from_id) {
info(n, -1, "slave_on_elect \n");
send_to(n, ANSWER_ID, from_id);
}
void slave_on_coord(struct node_state *n, int from_id) {
info(n, -1, "slave_on_coord \n");
n->state_time_remaining = n->timeout_val;
n->coord_id = from_id;
}
void slave_on_answer(struct node_state *n, int from_id) {
info(n, -1, "slave_on_answer \n");
}
void slave_on_aya(struct node_state *n, int from_id) {
info(n, -1, "slave_on_aya \n");
}
void slave_on_iaa(struct node_state *n, int from_id) {
info(n, -1, "slave_on_iaa \n");
n->state_time_remaining = n->timeout_val;
n->aya_time_rem = n->aya_time;
}
/*
* Send AYAs unless `state_time_remaining`
* has elapsed
*/
void slave_on_tick(struct node_state *n) {
n->state_time_remaining -= 1;
n->aya_time_rem -= 1;
info(n, -1, "tick() aya_rem: %d, state_time_rem: %d", n->aya_time_rem, n->state_time_remaining);
if(n->state_time_remaining <= 0) {
change_state(n, ST_CAND);
return;
}
if(n->aya_time_rem <= 0) {
send_aya(n);
n->aya_time_rem = n->aya_time;
}
}
void slave_on_exit(struct node_state *n) {
info(n, -1, "slave_on_exit \n");
n->state_time_remaining = n->timeout_val;
}
/* Wait Handlers */
int wait_get_timeout(struct node_state *n) {
info(n, -1, "wait_get_timeout \n");
return n->timeout_val * n->num_nodes;
}
void wait_on_enter(struct node_state *n) {
info(n, -1, "wait on_enter \n");
n->state_time_remaining = n->timeout_val * n->num_nodes;
}
void wait_on_elect(struct node_state *n, int from_id) {
info(n, -1, "wait on_elect \n");
}
void wait_on_coord(struct node_state *n, int from_id) {
info(n, -1, "wait on_coord \n");
n->coord_id = from_id;
change_state(n, ST_SLAVE);
}
void wait_on_answer(struct node_state *n, int from_id) {
info(n, -1, "wait on_answer \n");
}
void wait_on_aya(struct node_state *n, int from_id) {
info(n, -1, "wait on_aya \n");
}
void wait_on_iaa(struct node_state *n, int from_id) {
info(n, -1, "wait on_iaa \n");
}
void wait_on_tick(struct node_state *n) {
info(n, -1, "wait on_tick \n");
n->state_time_remaining -= 1;
if(n->state_time_remaining <= 0) {
change_state(n, ST_CAND);
}
}
void wait_on_exit(struct node_state *n) {
info(n, -1, "wait cleanup \n");
n->state_time_remaining = n->timeout_val;
}
/*
* The special clock process loops forever sending a tick
* to each process at the least every second.
*/
void start_clock(struct node_state *n) {
double curr_time, end_time;
for(;;) {
int i;
for (i = 1; i < n->num_nodes; i++) {
int buff_size = 1;
int buffer[buff_size];
buffer[0] = TICK;
MPI_Send(&buffer, buff_size, MPI_INT, i, TICK, MPI_COMM_WORLD);
}
// Busy wait
curr_time = MPI_Wtime();
end_time = curr_time + 1;
while(MPI_Wtime() < end_time);
}
MPI_Finalize();
}
/*
* Glue code to bind all state handlers.
* This is basically implementing polymorphism in C.
* Inspired by CPSC 415 device drivers
*/
void bind_states(struct node_state *n) {
n->handlers[ST_SLAVE].on_enter = slave_on_enter;
n->handlers[ST_SLAVE].on_elect = slave_on_elect;
n->handlers[ST_SLAVE].on_coord = slave_on_coord;
n->handlers[ST_SLAVE].on_answer = slave_on_answer;
n->handlers[ST_SLAVE].on_aya = slave_on_aya;
n->handlers[ST_SLAVE].on_iaa = slave_on_iaa;
n->handlers[ST_SLAVE].on_tick = slave_on_tick;
n->handlers[ST_SLAVE].on_exit = slave_on_exit;
n->handlers[ST_SLAVE].get_timeout = slave_get_timeout;
n->handlers[ST_SLAVE].on_kill = generic_on_kill;
n->handlers[ST_SLAVE].dead_tick = generic_dead_tick;
n->handlers[ST_CAND].on_enter = cand_on_enter;
n->handlers[ST_CAND].on_elect = cand_on_elect;
n->handlers[ST_CAND].on_coord = cand_on_coord;
n->handlers[ST_CAND].on_answer = cand_on_answer;
n->handlers[ST_CAND].on_aya = cand_on_aya;
n->handlers[ST_CAND].on_iaa = cand_on_iaa;
n->handlers[ST_CAND].on_tick = cand_on_tick;
n->handlers[ST_CAND].on_exit = cand_on_exit;
n->handlers[ST_CAND].get_timeout = cand_get_timeout;
n->handlers[ST_CAND].on_kill = generic_on_kill;
n->handlers[ST_CAND].dead_tick = generic_dead_tick;
n->handlers[ST_COORD].on_enter = coord_on_enter;
n->handlers[ST_COORD].on_elect = coord_on_elect;
n->handlers[ST_COORD].on_coord = coord_on_coord;
n->handlers[ST_COORD].on_answer = coord_on_answer;
n->handlers[ST_COORD].on_aya = coord_on_aya;
n->handlers[ST_COORD].on_iaa = coord_on_iaa;
n->handlers[ST_COORD].on_tick = coord_on_tick;
n->handlers[ST_COORD].on_exit = coord_on_exit;
n->handlers[ST_COORD].get_timeout = coord_get_timeout;
n->handlers[ST_COORD].on_kill = coord_on_kill;
n->handlers[ST_COORD].dead_tick = generic_dead_tick;
n->handlers[ST_WAIT].on_enter = wait_on_enter;
n->handlers[ST_WAIT].on_elect = wait_on_elect;
n->handlers[ST_WAIT].on_coord = wait_on_coord;
n->handlers[ST_WAIT].on_answer = wait_on_answer;
n->handlers[ST_WAIT].on_aya = wait_on_aya;
n->handlers[ST_WAIT].on_iaa = wait_on_iaa;
n->handlers[ST_WAIT].on_tick = wait_on_tick;
n->handlers[ST_WAIT].on_exit = wait_on_exit;
n->handlers[ST_WAIT].get_timeout = wait_get_timeout;
n->handlers[ST_WAIT].on_kill = generic_on_kill;
n->handlers[ST_WAIT].dead_tick = generic_dead_tick;
}
int bully( int argc, char** argv ) {
struct node_state *n = malloc(sizeof(struct node_state));
memset(n, 0, sizeof(struct node_state));
if(argc < 6) {
printf("Not enough arguments\n");
usage();
}
n->log_level = atoi(argv[1]);
n->timeout_val = atoi(argv[2]);
n->aya_time = atoi(argv[3]);
n->send_failure_rate = atoi(argv[4]);
n->return_to_life = atoi(argv[5]);
n->dlc = 1;
n->coord_id = -1;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &(n->num_nodes));
MPI_Comm_rank(MPI_COMM_WORLD, &(n->id));
MPIX_Get_collocated_size(&(n->co_size));
MPIX_Get_collocated_startrank(&(n->s_rank));
printf("Hello! I am rank %d of %d co-located size is %d, start rank %d \n", n->id, n->num_nodes, n->co_size, n->s_rank);
// Clock Node
if(n->id == CLOCK_ID) {
start_clock(n);
return(0);
}
// Bind state machine
bind_states(n);
if(n->id == n->num_nodes - 1) {
n->st = &(n->handlers[ST_COORD]);
} else {
n->st = &(n->handlers[ST_SLAVE]);
n->st->on_enter(n);
}
/*
* Because of our state machine implementation
* we can just call generic methods based on the message
* received. The current state will determine which
* handler is actually invoked.
*/
for(;;) {
MPI_Status status;
long rcv_dlc;
MPI_Recv(&rcv_dlc, 1, MPI_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if(status.MPI_SOURCE == CLOCK_ID) {
/*
* Keep track of how long we are dead for
* if we've been killed
*/
if(n->dead_time_remaining > 0) {
n->st->dead_tick(n);
} else {
n->st->on_tick(n);
}
continue;
}
if(n->dead_time_remaining > 0) {
continue;
}
n->dlc++;
n->dlc = n->dlc >= rcv_dlc ? n->dlc : rcv_dlc;
info(n, status.MPI_TAG, "Received cmd from %d", status.MPI_SOURCE);
switch(status.MPI_TAG) {
case ELECTION_ID:
n->st->on_elect(n, status.MPI_SOURCE);
break;
case AYA_ID:
n->st->on_aya(n, status.MPI_SOURCE);
break;
case IAA_ID:
n->st->on_iaa(n, status.MPI_SOURCE);
break;
case COORD_ID:
n->st->on_coord(n, status.MPI_SOURCE);
break;
case ANSWER_ID:
n->st->on_answer(n, status.MPI_SOURCE);
break;
case TERMINATE:
n->st->on_kill(n);
break;
default:
info(n, -1, "Unknown Command. Ignoring...");
}
}
MPI_Finalize();
return(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment