Last active
December 24, 2023 07:32
-
-
Save rlapz/118e8048096bb11d6b8d0be7aa74bd9c to your computer and use it in GitHub Desktop.
Thread queue sync
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <threads.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <unistd.h> | |
typedef struct _Node { | |
struct _Node *prev; | |
struct _Node *next; | |
} Node; | |
typedef struct { | |
mtx_t mutex; | |
cnd_t condv; | |
Node *head; | |
Node *tail; | |
} ThrdQueue; | |
static int tq_init(ThrdQueue *self); | |
static void tq_deinit(ThrdQueue *self); | |
static void tq_push(ThrdQueue *self, Node *new_node); | |
static Node *tq_pop(ThrdQueue *self); | |
static int | |
tq_init(ThrdQueue *self) | |
{ | |
if (mtx_init(&self->mutex, mtx_plain) != 0) | |
return -1; | |
if (cnd_init(&self->condv) != 0) | |
return -1; | |
self->head = NULL; | |
self->tail = NULL; | |
return 0; | |
} | |
static void | |
tq_deinit(ThrdQueue *self) | |
{ | |
cnd_destroy(&self->condv); | |
mtx_destroy(&self->mutex); | |
} | |
static void | |
tq_push(ThrdQueue *self, Node *new_node) | |
{ | |
mtx_lock(&self->mutex); // LOCK | |
Node *const head = self->head; | |
if (head != NULL) { | |
new_node->next = head; | |
head->prev = new_node; | |
} else { | |
new_node->next = NULL; | |
self->tail = new_node; | |
} | |
new_node->prev = NULL; | |
self->head = new_node; | |
cnd_signal(&self->condv); | |
mtx_unlock(&self->mutex); // UNLOCK | |
} | |
static Node * | |
tq_pop(ThrdQueue *self) | |
{ | |
mtx_lock(&self->mutex); // LOCK | |
Node *tail; | |
while (self->tail == NULL) { | |
if (cnd_wait(&self->condv, &self->mutex) != 0) { | |
tail = NULL; | |
goto out0; | |
} | |
} | |
tail = self->tail; | |
if (self->head != tail) { | |
self->tail = tail->prev; | |
self->tail->next = NULL; | |
} else { | |
self->head = NULL; | |
self->tail = NULL; | |
} | |
out0: | |
mtx_unlock(&self->mutex); // UNLOCK | |
return tail; | |
} | |
// test | |
struct test_data { | |
Node chan; | |
int data; | |
}; | |
struct context { | |
ThrdQueue queue; | |
ThrdQueue signal; | |
}; | |
static struct test_data anu[100]; | |
static int | |
producer(void *udata) | |
{ | |
for (int i = 0; i < 100; i++) { | |
anu[i].data = i; | |
tq_push(&((struct context *)udata)->queue, (Node *)&anu[i]); | |
printf("prod: %d\n", i); | |
} | |
puts("wait"); | |
// wait | |
Node *s = tq_pop(&((struct context *)udata)->signal); | |
if (s != NULL) | |
free(s); | |
puts("end"); | |
return 0; | |
} | |
static int | |
consumer(void *udata) | |
{ | |
for (int i = 0; i < 100; i++) { | |
Node *t = tq_pop(&((struct context *)udata)->queue); | |
if (t != NULL) | |
printf("consm: %d\n", ((struct test_data *)t)->data); | |
usleep(10000); | |
} | |
Node *d = malloc(sizeof(Node)); | |
if (d == NULL) | |
return 1; | |
tq_push(&((struct context *)udata)->signal, d); | |
return 0; | |
} | |
int | |
main(void) | |
{ | |
struct context ctx; | |
thrd_t thrd1; | |
if (tq_init(&ctx.queue) < 0) | |
return 1; | |
if (tq_init(&ctx.signal) < 0) | |
return 1; | |
if (thrd_create(&thrd1, consumer, &ctx) != 0) | |
goto out; | |
(void)producer(&ctx); | |
thrd_join(thrd1, NULL); | |
out: | |
tq_deinit(&ctx.queue); | |
tq_deinit(&ctx.signal); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment