Skip to content

Instantly share code, notes, and snippets.

@Gelio
Created March 30, 2017 08:38
Show Gist options
  • Save Gelio/e929f842a958028ee61f8d85afef4998 to your computer and use it in GitHub Desktop.
Save Gelio/e929f842a958028ee61f8d85afef4998 to your computer and use it in GitHub Desktop.
Pseudo numer computing in a cluster created with POSIX message queues in C for an Operating Systems course
CC=gcc
CFLAGS=-Wall -std=gnu99
LDLIBS=-lrt
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <mqueue.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#define MAX_QUEUE_NAME_LENGTH 100
#define MESSAGE_LENGTH 100
#define QUEUE_CAPACITY 10
#define CLUSTER_CAPACITY 45
#define PRIORITY_JOIN 3
#define PRIORITY_NEW_NODE 2
#define PRIORITY_ACTION 1
#define ERR(source) (fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \
perror(source), kill(0, SIGKILL), \
exit(EXIT_FAILURE) )
void setHandler(int signal, void (*handler)(int))
{
struct sigaction action;
memset(&action, 0, sizeof(struct sigaction));
action.sa_handler = handler;
action.sa_flags = SA_SIGINFO;
if (sigaction(signal, &action, NULL) < 0)
ERR("sigaction");
}
void usage(char *fileName)
{
fprintf(stderr, "Usage: %s ownQueueName k [otherQueueName]\n", fileName);
fprintf(stderr, "k - initial seed\n");
exit(EXIT_FAILURE);
}
char *getQueueName(char *baseName)
{
int queueNameSize = strlen(baseName) + 2;
char *queueName = malloc(queueNameSize * sizeof(char));
if (queueName == NULL)
ERR("malloc");
snprintf(queueName, queueNameSize, "/%s", baseName);
return queueName;
}
void setupQueueNotification(mqd_t queueDes)
{
static struct sigevent notification;
memset(&notification, 0, sizeof(struct sigevent));
notification.sigev_notify = SIGEV_SIGNAL;
notification.sigev_signo = SIGUSR1;
if (mq_notify(queueDes, &notification) < 0)
ERR("mq_notify");
printf("notifications setup\n");
}
int findFreeNodeIndex(char *clusterQueuePaths[CLUSTER_CAPACITY])
{
for (int i=0; i < CLUSTER_CAPACITY; i++)
if (clusterQueuePaths[i] == NULL)
return i;
return -1;
}
mqd_t openQueueWithCheck(char **queuePath)
{
mqd_t queueDes = TEMP_FAILURE_RETRY(mq_open(*queuePath, O_WRONLY));
if (queueDes < 0)
{
if (errno == 2) // EACCES does not work
{
free(*queuePath);
*queuePath = NULL;
return -1;
}
ERR("mq_open");
}
return queueDes;
}
void changeK(int *k, char *buffer)
{
int c = *(int*)buffer;
*k = *k ^ c;
printf("Changing k, k = %d\n", *k);
}
void handleNodeClusterJoining(char **buffer, char *ownQueuePath, char *clusterQueuePaths[CLUSTER_CAPACITY])
{
printf("Node wants to join a cluster (it's queue path: %s)\n", *buffer);
// Send messages to the node and add it to the cluster
int newNodeIndex = findFreeNodeIndex(clusterQueuePaths);
if (newNodeIndex == -1)
{
printf("No place in the cluster\n");
return;
}
mqd_t queueDes = openQueueWithCheck(buffer);
if (queueDes == -1)
return;
for (int i=0; i < CLUSTER_CAPACITY; i++)
{
if (clusterQueuePaths[i] == NULL)
continue;
// Send to all nodes that this node joined
mqd_t clusterQueueDes = openQueueWithCheck(clusterQueuePaths + i);
if (clusterQueueDes == -1)
continue;
if (TEMP_FAILURE_RETRY(mq_send(clusterQueueDes, *buffer, MESSAGE_LENGTH, PRIORITY_NEW_NODE)) < 0)
ERR("mq_send");
if (TEMP_FAILURE_RETRY(mq_close(clusterQueueDes)) < 0)
ERR("mq_close");
// Send all existing cluster node paths
if (TEMP_FAILURE_RETRY(mq_send(queueDes, clusterQueuePaths[i], MESSAGE_LENGTH, PRIORITY_NEW_NODE)) < 0)
ERR("mq_send");
}
// Send this node paths
if (TEMP_FAILURE_RETRY(mq_send(queueDes, ownQueuePath, MESSAGE_LENGTH, PRIORITY_NEW_NODE)) < 0)
ERR("mq_send");
if (TEMP_FAILURE_RETRY(mq_close(queueDes)) < 0)
ERR("mq_close");
// Add the new node to the internal list
clusterQueuePaths[newNodeIndex] = malloc(MESSAGE_LENGTH * sizeof(char));
if (clusterQueuePaths[newNodeIndex] == NULL)
ERR("malloc");
strncpy(clusterQueuePaths[newNodeIndex], *buffer, MESSAGE_LENGTH);
printf("Node at %s successfully added\n", *buffer);
}
void handleNewNode(char *clusterQueuePaths[CLUSTER_CAPACITY], char *buffer)
{
printf("New node in the cluster (%s), adding it to the internal list\n", buffer);
int newNodeIndex = findFreeNodeIndex(clusterQueuePaths);
if (newNodeIndex == -1)
{
printf("No place in the cluster\n");
return;
}
// Add the new node to the internal list
clusterQueuePaths[newNodeIndex] = malloc(MESSAGE_LENGTH * sizeof(char));
if (clusterQueuePaths[newNodeIndex] == NULL)
ERR("malloc");
strncpy(clusterQueuePaths[newNodeIndex], buffer, MESSAGE_LENGTH);
printf("Node at %s successfully added\n", buffer);
}
void readMessagesFromQueue(mqd_t ownQueueDes, char *ownQueuePath, int *k, char *clusterQueuePaths[CLUSTER_CAPACITY])
{
char *buffer = malloc(MESSAGE_LENGTH * sizeof(char));
if (buffer == NULL)
ERR("malloc");
// Read messages from the queue
unsigned messagePriority;
while (1)
{
if (TEMP_FAILURE_RETRY(mq_receive(ownQueueDes, buffer, MESSAGE_LENGTH, &messagePriority)) < 0)
{
if (errno == EAGAIN)
return;
ERR("mq_receive");
}
switch (messagePriority)
{
case PRIORITY_ACTION:
changeK(k, buffer);
break;
case PRIORITY_JOIN:
handleNodeClusterJoining(&buffer, ownQueuePath, clusterQueuePaths);
break;
case PRIORITY_NEW_NODE:
handleNewNode(clusterQueuePaths, buffer);
break;
default:
printf("Received a message with unknown priority, ignoring\n");
break;
}
}
free(buffer);
}
void signalHandler(int signal)
{
}
void joinExistingCluster(char *otherQueueName, char *ownQueuePath)
{
// Join an existing cluster
char buffer[MESSAGE_LENGTH];
char *clusterQueuePath = getQueueName(otherQueueName);
printf("Trying to join cluster at %s\n", clusterQueuePath);
// Nothing is connected yet, so there's no need to handle signals
mqd_t clusterQueueDes = mq_open(clusterQueuePath, O_WRONLY);
if (clusterQueueDes < 0)
ERR("mq_open");
snprintf(buffer, MESSAGE_LENGTH, "%s", ownQueuePath);
if (mq_send(clusterQueueDes, buffer, MESSAGE_LENGTH, PRIORITY_JOIN) < 0)
ERR("mq_send");
if (mq_close(clusterQueueDes) < 0)
ERR("mq_close");
printf("Sent join message to %s\n", clusterQueuePath);
free(clusterQueuePath);
}
int main(int argc, char **argv)
{
if (argc < 3 || argc > 4)
usage(argv[0]);
char *ownQueuePath = getQueueName(argv[1]);
int k = atoi(argv[2]);
char *clusterQueuePaths[CLUSTER_CAPACITY];
for (int i=0; i < CLUSTER_CAPACITY; i++)
clusterQueuePaths[i] = NULL;
struct mq_attr queueAttributes;
memset(&queueAttributes, 0, sizeof(struct mq_attr));
queueAttributes.mq_maxmsg = QUEUE_CAPACITY;
queueAttributes.mq_msgsize = MESSAGE_LENGTH;
setHandler(SIGUSR1, signalHandler);
// Nothing is connected yet, so there's no need to handle signals
mqd_t ownQueueDes = mq_open(ownQueuePath, O_RDONLY | O_CREAT | O_NONBLOCK, 0600, &queueAttributes);
if (ownQueueDes < 0)
ERR("mq_open");
printf("Own queue created at %s\n", ownQueuePath);
setupQueueNotification(ownQueueDes);
if (argc == 4)
joinExistingCluster(argv[3], ownQueuePath);
// Powinienem to zrefaktorować i przerzucić do oddzielnych funkcji, ale nie zdążyłem, brakowało mi 10 minut
while (1)
{
char userAction;
int scanRes = scanf("%c", &userAction);
if (scanRes == EOF)
{
if (errno != EINTR)
ERR("scanf");
setupQueueNotification(ownQueueDes);
readMessagesFromQueue(ownQueueDes, ownQueuePath, &k, clusterQueuePaths);
}
else
{
// Act upon action
int shouldQuit = 0;
switch (userAction)
{
case 'k':
printf("k = %d\n", k);
break;
case 'q':
shouldQuit = 1;
break;
case 'm':
{
char buffer[MESSAGE_LENGTH];
for (int i=0; i < CLUSTER_CAPACITY; i++)
{
if (clusterQueuePaths[i] == NULL)
continue;
mqd_t queueDes = openQueueWithCheck(clusterQueuePaths + i);
if (queueDes == -1)
continue;
int c = k + i;
*(int*)buffer = c;
if (TEMP_FAILURE_RETRY(mq_send(queueDes, buffer, MESSAGE_LENGTH, PRIORITY_ACTION)) < 0)
ERR("mq_send");
if (TEMP_FAILURE_RETRY(mq_close(queueDes)) < 0)
ERR("mq_close");
}
printf("Change k message sent\n");
}
break;
case '\n':
break;
default:
printf("Unknown command: %c\n", userAction);
printf("Known commands: k, q, m\n");
break;
}
if (shouldQuit)
break;
}
}
printf("Unsubscribing from notifications, closing and removing own queue\n");
if (mq_notify(ownQueueDes, NULL) < 0)
ERR("mq_notify");
if (mq_close(ownQueueDes) < 0)
ERR("mq_close");
if (mq_unlink(ownQueuePath) < 0)
ERR("mq_unlink");
free(ownQueuePath);
for (int i=0; i < CLUSTER_CAPACITY; i++)
if (clusterQueuePaths[i] != NULL)
free(clusterQueuePaths[i]);
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment