Created
March 30, 2017 08:38
-
-
Save Gelio/e929f842a958028ee61f8d85afef4998 to your computer and use it in GitHub Desktop.
Pseudo numer computing in a cluster created with POSIX message queues in C for an Operating Systems course
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CC=gcc | |
CFLAGS=-Wall -std=gnu99 | |
LDLIBS=-lrt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _GNU_SOURCE | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <mqueue.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <signal.h> | |
#include <errno.h> | |
#define MAX_QUEUE_NAME_LENGTH 100 | |
#define MESSAGE_LENGTH 100 | |
#define QUEUE_CAPACITY 10 | |
#define CLUSTER_CAPACITY 45 | |
#define PRIORITY_JOIN 3 | |
#define PRIORITY_NEW_NODE 2 | |
#define PRIORITY_ACTION 1 | |
#define ERR(source) (fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \ | |
perror(source), kill(0, SIGKILL), \ | |
exit(EXIT_FAILURE) ) | |
void setHandler(int signal, void (*handler)(int)) | |
{ | |
struct sigaction action; | |
memset(&action, 0, sizeof(struct sigaction)); | |
action.sa_handler = handler; | |
action.sa_flags = SA_SIGINFO; | |
if (sigaction(signal, &action, NULL) < 0) | |
ERR("sigaction"); | |
} | |
void usage(char *fileName) | |
{ | |
fprintf(stderr, "Usage: %s ownQueueName k [otherQueueName]\n", fileName); | |
fprintf(stderr, "k - initial seed\n"); | |
exit(EXIT_FAILURE); | |
} | |
char *getQueueName(char *baseName) | |
{ | |
int queueNameSize = strlen(baseName) + 2; | |
char *queueName = malloc(queueNameSize * sizeof(char)); | |
if (queueName == NULL) | |
ERR("malloc"); | |
snprintf(queueName, queueNameSize, "/%s", baseName); | |
return queueName; | |
} | |
void setupQueueNotification(mqd_t queueDes) | |
{ | |
static struct sigevent notification; | |
memset(¬ification, 0, sizeof(struct sigevent)); | |
notification.sigev_notify = SIGEV_SIGNAL; | |
notification.sigev_signo = SIGUSR1; | |
if (mq_notify(queueDes, ¬ification) < 0) | |
ERR("mq_notify"); | |
printf("notifications setup\n"); | |
} | |
int findFreeNodeIndex(char *clusterQueuePaths[CLUSTER_CAPACITY]) | |
{ | |
for (int i=0; i < CLUSTER_CAPACITY; i++) | |
if (clusterQueuePaths[i] == NULL) | |
return i; | |
return -1; | |
} | |
mqd_t openQueueWithCheck(char **queuePath) | |
{ | |
mqd_t queueDes = TEMP_FAILURE_RETRY(mq_open(*queuePath, O_WRONLY)); | |
if (queueDes < 0) | |
{ | |
if (errno == 2) // EACCES does not work | |
{ | |
free(*queuePath); | |
*queuePath = NULL; | |
return -1; | |
} | |
ERR("mq_open"); | |
} | |
return queueDes; | |
} | |
void changeK(int *k, char *buffer) | |
{ | |
int c = *(int*)buffer; | |
*k = *k ^ c; | |
printf("Changing k, k = %d\n", *k); | |
} | |
void handleNodeClusterJoining(char **buffer, char *ownQueuePath, char *clusterQueuePaths[CLUSTER_CAPACITY]) | |
{ | |
printf("Node wants to join a cluster (it's queue path: %s)\n", *buffer); | |
// Send messages to the node and add it to the cluster | |
int newNodeIndex = findFreeNodeIndex(clusterQueuePaths); | |
if (newNodeIndex == -1) | |
{ | |
printf("No place in the cluster\n"); | |
return; | |
} | |
mqd_t queueDes = openQueueWithCheck(buffer); | |
if (queueDes == -1) | |
return; | |
for (int i=0; i < CLUSTER_CAPACITY; i++) | |
{ | |
if (clusterQueuePaths[i] == NULL) | |
continue; | |
// Send to all nodes that this node joined | |
mqd_t clusterQueueDes = openQueueWithCheck(clusterQueuePaths + i); | |
if (clusterQueueDes == -1) | |
continue; | |
if (TEMP_FAILURE_RETRY(mq_send(clusterQueueDes, *buffer, MESSAGE_LENGTH, PRIORITY_NEW_NODE)) < 0) | |
ERR("mq_send"); | |
if (TEMP_FAILURE_RETRY(mq_close(clusterQueueDes)) < 0) | |
ERR("mq_close"); | |
// Send all existing cluster node paths | |
if (TEMP_FAILURE_RETRY(mq_send(queueDes, clusterQueuePaths[i], MESSAGE_LENGTH, PRIORITY_NEW_NODE)) < 0) | |
ERR("mq_send"); | |
} | |
// Send this node paths | |
if (TEMP_FAILURE_RETRY(mq_send(queueDes, ownQueuePath, MESSAGE_LENGTH, PRIORITY_NEW_NODE)) < 0) | |
ERR("mq_send"); | |
if (TEMP_FAILURE_RETRY(mq_close(queueDes)) < 0) | |
ERR("mq_close"); | |
// Add the new node to the internal list | |
clusterQueuePaths[newNodeIndex] = malloc(MESSAGE_LENGTH * sizeof(char)); | |
if (clusterQueuePaths[newNodeIndex] == NULL) | |
ERR("malloc"); | |
strncpy(clusterQueuePaths[newNodeIndex], *buffer, MESSAGE_LENGTH); | |
printf("Node at %s successfully added\n", *buffer); | |
} | |
void handleNewNode(char *clusterQueuePaths[CLUSTER_CAPACITY], char *buffer) | |
{ | |
printf("New node in the cluster (%s), adding it to the internal list\n", buffer); | |
int newNodeIndex = findFreeNodeIndex(clusterQueuePaths); | |
if (newNodeIndex == -1) | |
{ | |
printf("No place in the cluster\n"); | |
return; | |
} | |
// Add the new node to the internal list | |
clusterQueuePaths[newNodeIndex] = malloc(MESSAGE_LENGTH * sizeof(char)); | |
if (clusterQueuePaths[newNodeIndex] == NULL) | |
ERR("malloc"); | |
strncpy(clusterQueuePaths[newNodeIndex], buffer, MESSAGE_LENGTH); | |
printf("Node at %s successfully added\n", buffer); | |
} | |
void readMessagesFromQueue(mqd_t ownQueueDes, char *ownQueuePath, int *k, char *clusterQueuePaths[CLUSTER_CAPACITY]) | |
{ | |
char *buffer = malloc(MESSAGE_LENGTH * sizeof(char)); | |
if (buffer == NULL) | |
ERR("malloc"); | |
// Read messages from the queue | |
unsigned messagePriority; | |
while (1) | |
{ | |
if (TEMP_FAILURE_RETRY(mq_receive(ownQueueDes, buffer, MESSAGE_LENGTH, &messagePriority)) < 0) | |
{ | |
if (errno == EAGAIN) | |
return; | |
ERR("mq_receive"); | |
} | |
switch (messagePriority) | |
{ | |
case PRIORITY_ACTION: | |
changeK(k, buffer); | |
break; | |
case PRIORITY_JOIN: | |
handleNodeClusterJoining(&buffer, ownQueuePath, clusterQueuePaths); | |
break; | |
case PRIORITY_NEW_NODE: | |
handleNewNode(clusterQueuePaths, buffer); | |
break; | |
default: | |
printf("Received a message with unknown priority, ignoring\n"); | |
break; | |
} | |
} | |
free(buffer); | |
} | |
void signalHandler(int signal) | |
{ | |
} | |
void joinExistingCluster(char *otherQueueName, char *ownQueuePath) | |
{ | |
// Join an existing cluster | |
char buffer[MESSAGE_LENGTH]; | |
char *clusterQueuePath = getQueueName(otherQueueName); | |
printf("Trying to join cluster at %s\n", clusterQueuePath); | |
// Nothing is connected yet, so there's no need to handle signals | |
mqd_t clusterQueueDes = mq_open(clusterQueuePath, O_WRONLY); | |
if (clusterQueueDes < 0) | |
ERR("mq_open"); | |
snprintf(buffer, MESSAGE_LENGTH, "%s", ownQueuePath); | |
if (mq_send(clusterQueueDes, buffer, MESSAGE_LENGTH, PRIORITY_JOIN) < 0) | |
ERR("mq_send"); | |
if (mq_close(clusterQueueDes) < 0) | |
ERR("mq_close"); | |
printf("Sent join message to %s\n", clusterQueuePath); | |
free(clusterQueuePath); | |
} | |
int main(int argc, char **argv) | |
{ | |
if (argc < 3 || argc > 4) | |
usage(argv[0]); | |
char *ownQueuePath = getQueueName(argv[1]); | |
int k = atoi(argv[2]); | |
char *clusterQueuePaths[CLUSTER_CAPACITY]; | |
for (int i=0; i < CLUSTER_CAPACITY; i++) | |
clusterQueuePaths[i] = NULL; | |
struct mq_attr queueAttributes; | |
memset(&queueAttributes, 0, sizeof(struct mq_attr)); | |
queueAttributes.mq_maxmsg = QUEUE_CAPACITY; | |
queueAttributes.mq_msgsize = MESSAGE_LENGTH; | |
setHandler(SIGUSR1, signalHandler); | |
// Nothing is connected yet, so there's no need to handle signals | |
mqd_t ownQueueDes = mq_open(ownQueuePath, O_RDONLY | O_CREAT | O_NONBLOCK, 0600, &queueAttributes); | |
if (ownQueueDes < 0) | |
ERR("mq_open"); | |
printf("Own queue created at %s\n", ownQueuePath); | |
setupQueueNotification(ownQueueDes); | |
if (argc == 4) | |
joinExistingCluster(argv[3], ownQueuePath); | |
// Powinienem to zrefaktorować i przerzucić do oddzielnych funkcji, ale nie zdążyłem, brakowało mi 10 minut | |
while (1) | |
{ | |
char userAction; | |
int scanRes = scanf("%c", &userAction); | |
if (scanRes == EOF) | |
{ | |
if (errno != EINTR) | |
ERR("scanf"); | |
setupQueueNotification(ownQueueDes); | |
readMessagesFromQueue(ownQueueDes, ownQueuePath, &k, clusterQueuePaths); | |
} | |
else | |
{ | |
// Act upon action | |
int shouldQuit = 0; | |
switch (userAction) | |
{ | |
case 'k': | |
printf("k = %d\n", k); | |
break; | |
case 'q': | |
shouldQuit = 1; | |
break; | |
case 'm': | |
{ | |
char buffer[MESSAGE_LENGTH]; | |
for (int i=0; i < CLUSTER_CAPACITY; i++) | |
{ | |
if (clusterQueuePaths[i] == NULL) | |
continue; | |
mqd_t queueDes = openQueueWithCheck(clusterQueuePaths + i); | |
if (queueDes == -1) | |
continue; | |
int c = k + i; | |
*(int*)buffer = c; | |
if (TEMP_FAILURE_RETRY(mq_send(queueDes, buffer, MESSAGE_LENGTH, PRIORITY_ACTION)) < 0) | |
ERR("mq_send"); | |
if (TEMP_FAILURE_RETRY(mq_close(queueDes)) < 0) | |
ERR("mq_close"); | |
} | |
printf("Change k message sent\n"); | |
} | |
break; | |
case '\n': | |
break; | |
default: | |
printf("Unknown command: %c\n", userAction); | |
printf("Known commands: k, q, m\n"); | |
break; | |
} | |
if (shouldQuit) | |
break; | |
} | |
} | |
printf("Unsubscribing from notifications, closing and removing own queue\n"); | |
if (mq_notify(ownQueueDes, NULL) < 0) | |
ERR("mq_notify"); | |
if (mq_close(ownQueueDes) < 0) | |
ERR("mq_close"); | |
if (mq_unlink(ownQueuePath) < 0) | |
ERR("mq_unlink"); | |
free(ownQueuePath); | |
for (int i=0; i < CLUSTER_CAPACITY; i++) | |
if (clusterQueuePaths[i] != NULL) | |
free(clusterQueuePaths[i]); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment