Skip to content

Instantly share code, notes, and snippets.

@gustavorv86
Last active September 8, 2023 04:42
Show Gist options
  • Save gustavorv86/9e98621b44222114524399a3b4302ddb to your computer and use it in GitHub Desktop.
Save gustavorv86/9e98621b44222114524399a3b4302ddb to your computer and use it in GitHub Desktop.
POSIX message priority queue example written in C/C++
/**
* Compile:
* gcc -std=gnu11 -Wall -Wextra c_priority_queue_threads.c -o priority_queue_threads -lpthread -lrt
*/
#include <errno.h>
#include <mqueue.h>
#include <fcntl.h> /* For O_* constants. */
#include <sys/stat.h> /* For mode constants. */
#include <sys/types.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#define QUEUE_NAME "/test_queue" /* Queue name. */
#define QUEUE_PERMS ((int)(0644))
#define QUEUE_MAXMSG 16 /* Maximum number of messages. */
#define QUEUE_MSGSIZE 1024 /* Length of message. */
#define QUEUE_ATTR_INITIALIZER ((struct mq_attr){0, QUEUE_MAXMSG, QUEUE_MSGSIZE, 0, {0}})
/* The consumer is faster than the publisher. */
#define QUEUE_POLL_CONSUMER ((struct timespec){2, 500000000})
#define QUEUE_POLL_PUBLISHER ((struct timespec){5, 0})
#define QUEUE_MAX_PRIO ((int)(9))
static bool th_consumer_running = true;
static bool th_publisher_running = true;
void signal_handler(int signum) {
fprintf(stderr, "\n\nReceived signal: %d.\nStopping threads...\n", signum);
th_consumer_running = false;
th_publisher_running = false;
}
void * queue_consumer(void * args) {
(void) args; /* Suppress -Wunused-parameter warning. */
/* Initialize the queue attributes */
struct mq_attr attr = QUEUE_ATTR_INITIALIZER;
/* Create the message queue. The queue reader is NONBLOCK. */
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY | O_NONBLOCK, QUEUE_PERMS, &attr);
if(mq < 0) {
fprintf(stderr, "[CONSUMER]: Error, cannot open the queue: %s.\n", strerror(errno));
exit(1);
}
printf("[CONSUMER]: Queue opened, queue descriptor: %d.\n", mq);
unsigned int prio;
ssize_t bytes_read;
char buffer[QUEUE_MSGSIZE + 1];
struct timespec poll_sleep;
while(th_consumer_running) {
memset(buffer, 0x00, sizeof(buffer));
bytes_read = mq_receive(mq, buffer, QUEUE_MSGSIZE, &prio);
if(bytes_read >= 0) {
printf("[CONSUMER]: Received message: \"%s\"\n", buffer);
} else {
printf("[CONSUMER]: No messages yet.\n");
poll_sleep = QUEUE_POLL_CONSUMER;
nanosleep(&poll_sleep, NULL);
}
fflush(stdout);
}
/* Cleanup */
printf("[CONSUMER]: Cleanup...\n");
mq_close(mq);
mq_unlink(QUEUE_NAME);
return NULL;
}
void * queue_publisher(void * args) {
(void) args; /* Suppress -Wunused-parameter warning. */
/* Open the created queue by the consumer. */
mqd_t mq;
struct timespec poll_sleep;
do {
mq = mq_open(QUEUE_NAME, O_WRONLY);
if(mq < 0) {
printf("[PUBLISHER]: The queue is not created yet. Waiting...\n");
poll_sleep = QUEUE_POLL_PUBLISHER;
nanosleep(&poll_sleep, NULL);
}
} while(mq == -1);
printf("[PUBLISHER]: Queue opened, queue descriptor: %d.\n", mq);
/* Intializes random number generator. */
srand((unsigned)time(NULL));
unsigned int prio = 0;
int count = 1;
char buffer[QUEUE_MSGSIZE];
while(th_publisher_running) {
/* Send a burst of three messages */
for(int i=0; i<3; i++) {
prio = rand() % QUEUE_MAX_PRIO;
snprintf(buffer, sizeof(buffer), "MESSAGE NUMBER %d, PRIORITY %d", count, prio);
printf("[PUBLISHER]: Sending message %d with priority %d...\n", count, prio);
mq_send(mq, buffer, QUEUE_MSGSIZE, prio);
count++;
}
poll_sleep = QUEUE_POLL_PUBLISHER;
nanosleep(&poll_sleep, NULL);
fflush(stdout);
}
/* Cleanup */
printf("[PUBLISHER]: Cleanup...\n");
mq_close(mq);
return NULL;
}
int main() {
pthread_t th_publisher, th_consumer;
signal(SIGINT, signal_handler);
uid_t user_id = getuid();
if(user_id > 0) {
printf("Run as root.\n");
exit(EXIT_FAILURE);
}
printf("Start...\n");
pthread_create(&th_publisher, NULL, &queue_publisher, NULL);
pthread_create(&th_consumer, NULL, &queue_consumer, NULL);
pthread_join(th_publisher, NULL);
pthread_join(th_consumer, NULL);
printf("Done...\n");
return (EXIT_SUCCESS);
}
@dgvolpato
Copy link

@LucioSantinelli what do you mean by activate posix messages in the kernel?

@martinezcp
Copy link

What is the relevance of MAXSIZE 1024? when I changed it to 512 its not working.
Also the mq_maxmsg when set to a smaller value (say 5) I still get 8/9/10 when i try to read current message size using mq_curmsgs

I also have seen this issue. If I reduce/increase the size, it is not working. I have tried with 512, 10, 20, 20248, 10000...

@gustavorv86
Copy link
Author

gustavorv86 commented Aug 13, 2019

@jasjmj and @martinezcp

You try with the updated example (you need root permissions to open a message queue).
The mq_attr.mq_flags and mq_attr.mq_curmsgs fields are ignored for mq_open, see "man mq_open".
The QUEUE_MSGSIZE defines the maximum length of the message. I have tried with other values (64, 128, 256...).
When the program is running, Is a new device created in /dev/mqueue?
If you reproduce any errors then paste the standard output/error in a comment.

Regards.

@gustavorv86
Copy link
Author

gustavorv86 commented Aug 13, 2019

@ixnisarg, the usleep is an example, but the consumer has to be faster than the publisher. Otherwise the queue will be filled.
This call has been changed by nanosleep because unexpected behaviors with the SIGALRM signal, see "man usleep" notes section.

Regards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment