Skip to content

Instantly share code, notes, and snippets.

@astarasikov
Created June 1, 2016 11:55
Show Gist options
  • Save astarasikov/7fcb03254c1adb3f3d126330479d506e to your computer and use it in GitHub Desktop.
Save astarasikov/7fcb03254c1adb3f3d126330479d506e to your computer and use it in GitHub Desktop.
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
/**
* Supported features:
* [ ] msgQCreate priority/fifo
* [x] msgQSend priority
* [x] msgQReceive
* [x] msgQDelete
* [ ] msgQDelete safe -> needs VxWorks-like wrapper struct
* [ ] msgQSend/msgQReceive timeout
*/
#define CHECK(cond) \
do { \
if (!(cond)) { \
fprintf(stderr, "Failed to check '%s' in '%s':%d\n", #cond, __func__, \
__LINE__); \
goto failed; \
} \
} while (0)
#define STATUS int
#define ERROR (-1)
typedef uint32_t UINT;
#define MSG_Q_FIFO 0x00
#define MSG_Q_PRIORITY 0x01
#define MSG_Q_EVENTSEND_ERR_NOTIF 0x02
#define MSG_PRI_NORMAL 0
#define MSG_PRI_URGENT 1
typedef struct msg_q {
size_t capacity;
size_t used;
size_t maxMsgLength;
uint8_t *data;
size_t head;
uint8_t isDestroyed;
pthread_mutex_t q_mutex;
pthread_cond_t q_cond;
} msg_q;
typedef struct msg_q *MSG_Q_ID;
MSG_Q_ID msgQCreate(int maxMsgs, int maxMsgLength, int options) {
msg_q *q = NULL;
CHECK(maxMsgs > 0);
CHECK(maxMsgLength > 0);
(void)options;
q = malloc(sizeof(msg_q));
CHECK(NULL != q);
memset(q, 0, sizeof(msg_q));
q->capacity = maxMsgs;
q->used = 0;
q->maxMsgLength = maxMsgLength;
q->data = (uint8_t *)malloc(maxMsgLength * maxMsgs);
CHECK(NULL != q->data);
q->head = 0;
q->isDestroyed = 0;
pthread_mutex_init(&q->q_mutex, NULL);
pthread_cond_init(&q->q_cond, NULL);
return q;
failed:
if (!q) {
return NULL;
}
if (q->data) {
free(q->data);
q->data = NULL;
}
memset(q, 0, sizeof(msg_q));
free(q);
return NULL;
}
STATUS msgQDelete(MSG_Q_ID msgQId) {
/* wake up everyone and return ERROR */
CHECK(NULL != msgQId);
pthread_mutex_lock(&msgQId->q_mutex);
msgQId->isDestroyed = 1;
pthread_mutex_unlock(&msgQId->q_mutex);
pthread_cond_broadcast(&msgQId->q_cond);
failed:
return ERROR;
}
STATUS msgQSend(MSG_Q_ID msgQId, char *buffer, UINT nBytes, int timeout,
int priority) {
STATUS rc = ERROR;
CHECK(NULL != msgQId);
(void)timeout;
pthread_mutex_lock(&msgQId->q_mutex);
while ((!msgQId->isDestroyed) && (msgQId->used >= msgQId->capacity)) {
pthread_cond_wait(&msgQId->q_cond, &msgQId->q_mutex);
}
CHECK(!msgQId->isDestroyed);
CHECK(msgQId->used < msgQId->capacity);
size_t idx = msgQId->head + msgQId->used;
if (MSG_PRI_URGENT == priority) {
msgQId->head = (msgQId->capacity + msgQId->head - 1) % msgQId->capacity;
idx = msgQId->head;
}
idx %= msgQId->capacity;
msgQId->used++;
size_t maxMsgLength = msgQId->maxMsgLength;
if (nBytes > maxMsgLength) {
nBytes = maxMsgLength;
}
memcpy(&msgQId->data[idx * maxMsgLength], buffer, nBytes);
rc = nBytes;
failed:
pthread_mutex_unlock(&msgQId->q_mutex);
pthread_cond_broadcast(&msgQId->q_cond);
return rc;
}
STATUS msgQReceive(MSG_Q_ID msgQId, char *buffer, UINT maxNBytes, int timeout) {
STATUS rc = ERROR;
CHECK(NULL != msgQId);
(void)timeout;
pthread_mutex_lock(&msgQId->q_mutex);
while ((!msgQId->isDestroyed) && (msgQId->used <= 0)) {
pthread_cond_wait(&msgQId->q_cond, &msgQId->q_mutex);
}
CHECK(!msgQId->isDestroyed);
CHECK(msgQId->used > 0);
size_t idx = msgQId->head;
msgQId->head++;
msgQId->used--;
idx %= msgQId->capacity;
size_t maxMsgLength = msgQId->maxMsgLength;
if (maxNBytes > maxMsgLength) {
maxNBytes = maxMsgLength;
}
memcpy(buffer, &msgQId->data[idx * maxMsgLength], maxNBytes);
memset(&msgQId->data[idx * maxMsgLength], 0, maxNBytes);
rc = maxNBytes;
failed:
pthread_mutex_unlock(&msgQId->q_mutex);
pthread_cond_broadcast(&msgQId->q_cond);
return rc;
}
int msgQNumMsgs(MSG_Q_ID msgQId) {
CHECK(NULL != msgQId);
return msgQId->used;
failed:
return ERROR;
}
/******************************************************************************
* Message Queue Library testing routines
*****************************************************************************/
enum {
TEST_Q_N_MSGS = 10,
TEST_Q_MSG_LEN = 64,
};
static uint32_t calc_sum(uint8_t *data, size_t length) {
uint32_t sum = 0;
CHECK(NULL != data);
size_t i;
for (i = 0; i < length; i++) {
sum += data[i];
}
failed:
return sum;
}
static void fill_rand(uint8_t *data, size_t length) {
size_t i;
for (i = 0; i < length; i++) {
data[i] = rand();
}
}
static void *consumer(void *arg) {
MSG_Q_ID q = (MSG_Q_ID)arg;
CHECK(NULL != q);
while (1) {
uint8_t data[TEST_Q_MSG_LEN] = {};
fprintf(stderr, "CONSUMER: waiting\n");
STATUS stat = msgQReceive(q, (char *)data, TEST_Q_MSG_LEN, 0);
CHECK(ERROR != stat);
uint32_t csum = calc_sum(data, TEST_Q_MSG_LEN);
fprintf(stderr, "CONSUMER: checksum %x\n", csum);
}
failed:
return NULL;
}
static void *producer(void *arg) {
MSG_Q_ID q = (MSG_Q_ID)arg;
CHECK(NULL != q);
while (1) {
uint8_t data[TEST_Q_MSG_LEN] = {};
fill_rand(data, TEST_Q_MSG_LEN);
uint32_t csum = calc_sum(data, TEST_Q_MSG_LEN);
fprintf(stderr, "PRODUCER: checksum %x waiting\n", csum);
STATUS stat = msgQSend(q, (char *)data, TEST_Q_MSG_LEN, 0, MSG_PRI_NORMAL);
CHECK(ERROR != stat);
fprintf(stderr, "PRODUCER: checksum %x gone\n", csum);
}
failed:
return NULL;
}
int main() {
pthread_t t_consumer, t_producer;
memset(&t_consumer, 0, sizeof(t_consumer));
memset(&t_producer, 0, sizeof(t_producer));
MSG_Q_ID testQ = msgQCreate(TEST_Q_N_MSGS, TEST_Q_MSG_LEN, 0);
CHECK(NULL != testQ);
CHECK(0 == pthread_create(&t_consumer, NULL, consumer, testQ));
CHECK(0 == pthread_create(&t_producer, NULL, producer, testQ));
sleep(5);
msgQDelete(testQ);
pthread_join(t_consumer, NULL);
pthread_join(t_producer, NULL);
return 0;
failed:
return -1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment