Created
February 14, 2023 16:18
-
-
Save cpq/41bbc4b50122be93709f203607e3f8d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Single producer, single consumer non-blocking queue | |
// | |
// Producer: | |
// void *buf; | |
// while (mg_queue_space(q, &buf, len) == 0) WAIT(); // Wait for free space | |
// memcpy(buf, data, len); // Copy data to the queue | |
// mg_queue_add(q, len); // Advance q->head | |
// | |
// Consumer: | |
// void *buf; | |
// while ((len = mg_queue_next(q, &buf)) == 0) WAIT(); | |
// mg_hexdump(buf, len); // Handle message | |
// mg_queue_del(q); // Delete message (advance tail) | |
// | |
struct mg_queue { | |
char *buf; | |
size_t len; | |
volatile int tail; | |
volatile int head; | |
}; | |
void mg_queue_init(struct mg_queue *, char *, size_t); // Init queue | |
void mg_queue_add(struct mg_queue *, size_t len); // Advance head | |
void mg_queue_del(struct mg_queue *); // Advance tail | |
size_t mg_queue_next(struct mg_queue *, char **); // Get next message size | |
size_t mg_queue_space(struct mg_queue *, char **, size_t); // Get free space | |
// Data starts from the begining of the buffer, and offsets to the individual | |
// mesages are stored in the "offsets table" at the end of the buffer. | |
// The "offsets table" behaves like a stack: it grows down. Every stored offset | |
// is 32-bit, i.e. sizeof(uint32_t). | |
// | |
// When a new message is added, new offset is written and q->head is incremented | |
// The end of the offsets table is marked by zero offset. | |
// | |
// If data buffer is filled, q->head cannot increase anymore but there is | |
// free space at the beginning of the buffer, q->head can wrap to the beginning | |
// In order to wrap, q->tail should be >= 2: we must preserve a previous tail | |
// offset to know the current message length, and we must keep space for the | |
// zero marker, which ends the offsets table. | |
// So when a tail is larger than a head, then q->tail >= q->head + 2. | |
// | |
// |------------- data --------------------->|<- offsets table--| | |
// | | |
// |----free----|message1|message2|--free----| 0 |off3|off2|off1| | |
// ^ ^ ^ ^ ^ ^ ^ | |
// buf off1 off2 off3 head tail len | |
// Return a pointer to the offsets table at a given index | |
#define MG_QPOS(q, i) (&((uint32_t *) &q->buf[q->len])[-1 - i]) | |
void mg_queue_init(struct mg_queue *q, char *buf, size_t len) { | |
q->len = len - (len % sizeof(uint32_t)); // Align | |
q->buf = buf; | |
q->head = q->tail = 0; | |
if (q->len > 0) MG_QPOS(q, 0)[0] = 0; // Mark empty: *head = 0 | |
} | |
size_t mg_queue_space(struct mg_queue *q, char **buf, size_t min) { | |
uint32_t *h = MG_QPOS(q, q->head), *t = MG_QPOS(q, q->tail), space = 0; | |
uint32_t hprev = q->head == 0 ? 0 : h[1]; // Previous offset | |
if (q->len == 0 || q->buf == NULL) { // If queue is not inited, do nothing | |
} else if (h <= t) { // Tail is behind, or equal to head | |
uint32_t ts = sizeof(uint32_t) * ((uint32_t) q->head + 2); // Table size | |
if (hprev + min + ts <= q->len) { // Enough space ahead ? | |
space = (uint32_t) q->len - ts - hprev; // Yeah | |
} else if (q->tail > 2) { // Nope. Have space to wrap around? | |
h = MG_QPOS(q, 0), *h = 0, space = t[1]; | |
// TODO(cpq): memory barrier here? q->head must go AFTER *h = 0 | |
q->head = 0; // Wrap! | |
} | |
} else if (&t[2] < h) { // Head is behind. Available space is from | |
space = t[1] - hprev; // the currect head till where tail begins | |
} | |
if (buf != NULL) *buf = q->buf + (q->head == 0 ? 0 : h[1]); | |
// printf("-->spc: %3d %3d %u %lu\n", q->tail, q->head, space, min); | |
return space; | |
} | |
size_t mg_queue_next(struct mg_queue *q, char **buf) { | |
uint32_t len = 0, prev, *t = MG_QPOS(q, q->tail); | |
if (q->len == 0 || q->buf == NULL) { // If queue is not inited, do nothing | |
} else { | |
if (q->tail > q->head && *t == 0) { // Are we ahead of head and read all? | |
q->tail = 0, t = MG_QPOS(q, 0); // Yes. Wrap around | |
} | |
prev = q->tail == 0 ? 0 : t[1]; | |
if (*t > 0) len = *t - prev; | |
if (buf != NULL) *buf = q->buf + prev; | |
} | |
// printf("-->nxt: %3d %3d %lu\n", q->tail, q->head, len); | |
return len; | |
} | |
void mg_queue_add(struct mg_queue *q, size_t len) { | |
if (len > 0) { | |
uint32_t *h = MG_QPOS(q, q->head), prev = q->head == 0 ? 0 : h[1]; | |
h[0] = prev + (uint32_t) len; // Store next offset | |
h[-1] = 0; // Mark the end | |
q->head++; // Advance head | |
} | |
// printf("-->add: %3d %3d %lu %lu\n", q->tail, q->head, len, | |
} | |
void mg_queue_del(struct mg_queue *q) { | |
uint32_t *t = MG_QPOS(q, q->tail); // Current tail pointer | |
if (*t > 0) q->tail++; // If there is message, proceed to next | |
// printf("-->del: %3d %3d\n", q->tail, q->head); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment