Skip to content

Instantly share code, notes, and snippets.

@Gelio
Created March 25, 2017 20:13
Show Gist options
  • Save Gelio/d858fd6fa8e0d6de23ffdb7a30924acf to your computer and use it in GitHub Desktop.
Save Gelio/d858fd6fa8e0d6de23ffdb7a30924acf to your computer and use it in GitHub Desktop.
Divisibility checker using a waterfall of POSIX message queues created in C for the Operating Systems class
#include <stdlib.h>
#include <stdio.h>
#include <mqueue.h>
#include <errno.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#define MAX_QUEUE_NAME_LENGTH 10
#define MESSAGE_LENGTH sizeof(int)
#define QUEUE_CAPACITY 10
#define ERR(source) (fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \
perror(source), kill(0, SIGKILL), \
exit(EXIT_FAILURE) )
struct number_info {
int currentQueueID;
int n;
mqd_t *queueDes;
};
typedef struct number_info number_info_t;
void usage(char *fileName)
{
fprintf(stderr, "Usage: %s n\n", fileName);
fprintf(stderr, "n - number of queues (divisors), 1 <= n <= 100\n");
exit(EXIT_FAILURE);
}
char *getQueueName(int i)
{
char *queueName = malloc(MAX_QUEUE_NAME_LENGTH * sizeof(char));
if (queueName == NULL)
ERR("malloc");
snprintf(queueName, MAX_QUEUE_NAME_LENGTH, "/q%d", i);
return queueName;
}
void setupQueueNotification(mqd_t queueDes, number_info_t *numberInfo);
void processNumber(sigval_t numberInfoSigval)
{
number_info_t *numberInfo = (number_info_t*)numberInfoSigval.sival_ptr;
// check if number is divisible by currentQueueID + 1
// if yes, then discard it
// if not, then:
// if currentQueueID + 1 == n: print the number
// else: send the number (with the whole structure) to the next queue (currentQueueID + 1)
mqd_t currentQueueDes = numberInfo->queueDes[numberInfo->currentQueueID];
setupQueueNotification(currentQueueDes, numberInfo);
//~ printf("processing number for queue %d (divider %d)\n", numberInfo->currentQueueID, numberInfo->currentQueueID + 1);
struct mq_attr queueAttributes;
do
{
if (mq_getattr(currentQueueDes, &queueAttributes) < 0)
ERR("mq_getattr");
for (int i = 0; i < queueAttributes.mq_curmsgs; i++)
{
int number;
if (mq_receive(currentQueueDes, (char*)&number, MESSAGE_LENGTH, NULL) < 0)
ERR("mq_receive");
//~ printf("Got number %d\n", number);
if (number % (numberInfo->currentQueueID + 2) != 0)
{
//~ printf("not divisable\n");
if (numberInfo->currentQueueID + 1 == numberInfo->n)
printf("number %d surfaced (not divisable by 2, 3, ..., %d)\n", number, numberInfo->currentQueueID + 2);
else
{
//~ printf("queue %d sends number %d forward\n", numberInfo->currentQueueID, number);
if (mq_send(numberInfo->queueDes[numberInfo->currentQueueID + 1], (char*)&number, MESSAGE_LENGTH, 0) < 0)
ERR("mq_send");
}
}
}
} while (queueAttributes.mq_curmsgs > 0);
}
void setupQueueNotification(mqd_t queueDes, number_info_t *numberInfo)
{
struct sigevent notification;
memset(&notification, 0, sizeof(struct sigevent));
notification.sigev_notify = SIGEV_THREAD;
notification.sigev_notify_function = processNumber;
notification.sigev_value.sival_ptr = numberInfo;
if (mq_notify(queueDes, &notification) < 0)
ERR("mq_notify");
}
int main(int argc, char **argv)
{
if (argc != 2)
usage(argv[0]);
int n = atoi(argv[1]);
if (n < 1 || n > 100)
usage(argv[0]);
char **queueNames = malloc(n * sizeof(char*));
if (queueNames == NULL)
ERR("malloc");
mqd_t *queueDes = malloc(n * sizeof(mqd_t));
if (queueDes == NULL)
ERR("malloc");
number_info_t **numberInfoStructures = malloc(n * sizeof(number_info_t*));
if (numberInfoStructures == NULL)
ERR("malloc");
struct mq_attr queueAttributes;
queueAttributes.mq_maxmsg = QUEUE_CAPACITY;
queueAttributes.mq_msgsize = MESSAGE_LENGTH;
for (int i=0; i < n; i++)
{
queueNames[i] = getQueueName(i);
queueDes[i] = mq_open(queueNames[i], O_RDWR | O_CREAT, 0700, &queueAttributes);
if (queueDes[i] < 0)
ERR("mq_open");
numberInfoStructures[i] = malloc(sizeof(number_info_t));
if (numberInfoStructures[i] == NULL)
ERR("malloc");
numberInfoStructures[i]->n = n;
numberInfoStructures[i]->queueDes = queueDes;
numberInfoStructures[i]->currentQueueID = i;
setupQueueNotification(queueDes[i], numberInfoStructures[i]);
printf("queue %s for divider %d created\n", queueNames[i], i + 1);
}
char *buffer = NULL;
size_t bufferSize = 0;
ssize_t bytesRead;
while ((bytesRead = getline(&buffer, &bufferSize, stdin)) != -1)
{
int number = atoi(buffer);
// send number to first queue
//~ printf("Read %d, sending to first queue\n", number);
if (mq_send(queueDes[0], (char*)&number, MESSAGE_LENGTH, 0) < 0)
ERR("mq_send");
}
free(buffer);
printf("no more input, waiting 2 seconds and cleaning up\n");
sleep(2);
printf("closing and removing queues\n");
for (int i=0; i < n; i++)
{
if (mq_notify(queueDes[i], NULL) < 0) // unsubscribe from notifications
ERR("mq_notify");
if (mq_close(queueDes[i]) < 0)
ERR("mq_close");
if (mq_unlink(queueNames[i]) < 0)
ERR("mq_unlink");
free(queueNames[i]);
free(numberInfoStructures[i]);
}
free(queueDes);
free(queueNames);
free(numberInfoStructures);
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment