Created
June 1, 2016 11:55
-
-
Save astarasikov/7fcb03254c1adb3f3d126330479d506e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdint.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
/** | |
* Supported features: | |
* [ ] msgQCreate priority/fifo | |
* [x] msgQSend priority | |
* [x] msgQReceive | |
* [x] msgQDelete | |
* [ ] msgQDelete safe -> needs VxWorks-like wrapper struct | |
* [ ] msgQSend/msgQReceive timeout | |
*/ | |
#define CHECK(cond) \ | |
do { \ | |
if (!(cond)) { \ | |
fprintf(stderr, "Failed to check '%s' in '%s':%d\n", #cond, __func__, \ | |
__LINE__); \ | |
goto failed; \ | |
} \ | |
} while (0) | |
#define STATUS int | |
#define ERROR (-1) | |
typedef uint32_t UINT; | |
#define MSG_Q_FIFO 0x00 | |
#define MSG_Q_PRIORITY 0x01 | |
#define MSG_Q_EVENTSEND_ERR_NOTIF 0x02 | |
#define MSG_PRI_NORMAL 0 | |
#define MSG_PRI_URGENT 1 | |
typedef struct msg_q { | |
size_t capacity; | |
size_t used; | |
size_t maxMsgLength; | |
uint8_t *data; | |
size_t head; | |
uint8_t isDestroyed; | |
pthread_mutex_t q_mutex; | |
pthread_cond_t q_cond; | |
} msg_q; | |
typedef struct msg_q *MSG_Q_ID; | |
MSG_Q_ID msgQCreate(int maxMsgs, int maxMsgLength, int options) { | |
msg_q *q = NULL; | |
CHECK(maxMsgs > 0); | |
CHECK(maxMsgLength > 0); | |
(void)options; | |
q = malloc(sizeof(msg_q)); | |
CHECK(NULL != q); | |
memset(q, 0, sizeof(msg_q)); | |
q->capacity = maxMsgs; | |
q->used = 0; | |
q->maxMsgLength = maxMsgLength; | |
q->data = (uint8_t *)malloc(maxMsgLength * maxMsgs); | |
CHECK(NULL != q->data); | |
q->head = 0; | |
q->isDestroyed = 0; | |
pthread_mutex_init(&q->q_mutex, NULL); | |
pthread_cond_init(&q->q_cond, NULL); | |
return q; | |
failed: | |
if (!q) { | |
return NULL; | |
} | |
if (q->data) { | |
free(q->data); | |
q->data = NULL; | |
} | |
memset(q, 0, sizeof(msg_q)); | |
free(q); | |
return NULL; | |
} | |
STATUS msgQDelete(MSG_Q_ID msgQId) { | |
/* wake up everyone and return ERROR */ | |
CHECK(NULL != msgQId); | |
pthread_mutex_lock(&msgQId->q_mutex); | |
msgQId->isDestroyed = 1; | |
pthread_mutex_unlock(&msgQId->q_mutex); | |
pthread_cond_broadcast(&msgQId->q_cond); | |
failed: | |
return ERROR; | |
} | |
STATUS msgQSend(MSG_Q_ID msgQId, char *buffer, UINT nBytes, int timeout, | |
int priority) { | |
STATUS rc = ERROR; | |
CHECK(NULL != msgQId); | |
(void)timeout; | |
pthread_mutex_lock(&msgQId->q_mutex); | |
while ((!msgQId->isDestroyed) && (msgQId->used >= msgQId->capacity)) { | |
pthread_cond_wait(&msgQId->q_cond, &msgQId->q_mutex); | |
} | |
CHECK(!msgQId->isDestroyed); | |
CHECK(msgQId->used < msgQId->capacity); | |
size_t idx = msgQId->head + msgQId->used; | |
if (MSG_PRI_URGENT == priority) { | |
msgQId->head = (msgQId->capacity + msgQId->head - 1) % msgQId->capacity; | |
idx = msgQId->head; | |
} | |
idx %= msgQId->capacity; | |
msgQId->used++; | |
size_t maxMsgLength = msgQId->maxMsgLength; | |
if (nBytes > maxMsgLength) { | |
nBytes = maxMsgLength; | |
} | |
memcpy(&msgQId->data[idx * maxMsgLength], buffer, nBytes); | |
rc = nBytes; | |
failed: | |
pthread_mutex_unlock(&msgQId->q_mutex); | |
pthread_cond_broadcast(&msgQId->q_cond); | |
return rc; | |
} | |
STATUS msgQReceive(MSG_Q_ID msgQId, char *buffer, UINT maxNBytes, int timeout) { | |
STATUS rc = ERROR; | |
CHECK(NULL != msgQId); | |
(void)timeout; | |
pthread_mutex_lock(&msgQId->q_mutex); | |
while ((!msgQId->isDestroyed) && (msgQId->used <= 0)) { | |
pthread_cond_wait(&msgQId->q_cond, &msgQId->q_mutex); | |
} | |
CHECK(!msgQId->isDestroyed); | |
CHECK(msgQId->used > 0); | |
size_t idx = msgQId->head; | |
msgQId->head++; | |
msgQId->used--; | |
idx %= msgQId->capacity; | |
size_t maxMsgLength = msgQId->maxMsgLength; | |
if (maxNBytes > maxMsgLength) { | |
maxNBytes = maxMsgLength; | |
} | |
memcpy(buffer, &msgQId->data[idx * maxMsgLength], maxNBytes); | |
memset(&msgQId->data[idx * maxMsgLength], 0, maxNBytes); | |
rc = maxNBytes; | |
failed: | |
pthread_mutex_unlock(&msgQId->q_mutex); | |
pthread_cond_broadcast(&msgQId->q_cond); | |
return rc; | |
} | |
int msgQNumMsgs(MSG_Q_ID msgQId) { | |
CHECK(NULL != msgQId); | |
return msgQId->used; | |
failed: | |
return ERROR; | |
} | |
/****************************************************************************** | |
* Message Queue Library testing routines | |
*****************************************************************************/ | |
enum { | |
TEST_Q_N_MSGS = 10, | |
TEST_Q_MSG_LEN = 64, | |
}; | |
static uint32_t calc_sum(uint8_t *data, size_t length) { | |
uint32_t sum = 0; | |
CHECK(NULL != data); | |
size_t i; | |
for (i = 0; i < length; i++) { | |
sum += data[i]; | |
} | |
failed: | |
return sum; | |
} | |
static void fill_rand(uint8_t *data, size_t length) { | |
size_t i; | |
for (i = 0; i < length; i++) { | |
data[i] = rand(); | |
} | |
} | |
static void *consumer(void *arg) { | |
MSG_Q_ID q = (MSG_Q_ID)arg; | |
CHECK(NULL != q); | |
while (1) { | |
uint8_t data[TEST_Q_MSG_LEN] = {}; | |
fprintf(stderr, "CONSUMER: waiting\n"); | |
STATUS stat = msgQReceive(q, (char *)data, TEST_Q_MSG_LEN, 0); | |
CHECK(ERROR != stat); | |
uint32_t csum = calc_sum(data, TEST_Q_MSG_LEN); | |
fprintf(stderr, "CONSUMER: checksum %x\n", csum); | |
} | |
failed: | |
return NULL; | |
} | |
static void *producer(void *arg) { | |
MSG_Q_ID q = (MSG_Q_ID)arg; | |
CHECK(NULL != q); | |
while (1) { | |
uint8_t data[TEST_Q_MSG_LEN] = {}; | |
fill_rand(data, TEST_Q_MSG_LEN); | |
uint32_t csum = calc_sum(data, TEST_Q_MSG_LEN); | |
fprintf(stderr, "PRODUCER: checksum %x waiting\n", csum); | |
STATUS stat = msgQSend(q, (char *)data, TEST_Q_MSG_LEN, 0, MSG_PRI_NORMAL); | |
CHECK(ERROR != stat); | |
fprintf(stderr, "PRODUCER: checksum %x gone\n", csum); | |
} | |
failed: | |
return NULL; | |
} | |
int main() { | |
pthread_t t_consumer, t_producer; | |
memset(&t_consumer, 0, sizeof(t_consumer)); | |
memset(&t_producer, 0, sizeof(t_producer)); | |
MSG_Q_ID testQ = msgQCreate(TEST_Q_N_MSGS, TEST_Q_MSG_LEN, 0); | |
CHECK(NULL != testQ); | |
CHECK(0 == pthread_create(&t_consumer, NULL, consumer, testQ)); | |
CHECK(0 == pthread_create(&t_producer, NULL, producer, testQ)); | |
sleep(5); | |
msgQDelete(testQ); | |
pthread_join(t_consumer, NULL); | |
pthread_join(t_producer, NULL); | |
return 0; | |
failed: | |
return -1; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment