Skip to content

Instantly share code, notes, and snippets.

@rlapz
Last active December 24, 2023 07:32
Show Gist options
  • Save rlapz/118e8048096bb11d6b8d0be7aa74bd9c to your computer and use it in GitHub Desktop.
Save rlapz/118e8048096bb11d6b8d0be7aa74bd9c to your computer and use it in GitHub Desktop.
Thread queue sync
#include <threads.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
typedef struct _Node {
struct _Node *prev;
struct _Node *next;
} Node;
typedef struct {
mtx_t mutex;
cnd_t condv;
Node *head;
Node *tail;
} ThrdQueue;
static int tq_init(ThrdQueue *self);
static void tq_deinit(ThrdQueue *self);
static void tq_push(ThrdQueue *self, Node *new_node);
static Node *tq_pop(ThrdQueue *self);
static int
tq_init(ThrdQueue *self)
{
if (mtx_init(&self->mutex, mtx_plain) != 0)
return -1;
if (cnd_init(&self->condv) != 0)
return -1;
self->head = NULL;
self->tail = NULL;
return 0;
}
static void
tq_deinit(ThrdQueue *self)
{
cnd_destroy(&self->condv);
mtx_destroy(&self->mutex);
}
static void
tq_push(ThrdQueue *self, Node *new_node)
{
mtx_lock(&self->mutex); // LOCK
Node *const head = self->head;
if (head != NULL) {
new_node->next = head;
head->prev = new_node;
} else {
new_node->next = NULL;
self->tail = new_node;
}
new_node->prev = NULL;
self->head = new_node;
cnd_signal(&self->condv);
mtx_unlock(&self->mutex); // UNLOCK
}
static Node *
tq_pop(ThrdQueue *self)
{
mtx_lock(&self->mutex); // LOCK
Node *tail;
while (self->tail == NULL) {
if (cnd_wait(&self->condv, &self->mutex) != 0) {
tail = NULL;
goto out0;
}
}
tail = self->tail;
if (self->head != tail) {
self->tail = tail->prev;
self->tail->next = NULL;
} else {
self->head = NULL;
self->tail = NULL;
}
out0:
mtx_unlock(&self->mutex); // UNLOCK
return tail;
}
// test
struct test_data {
Node chan;
int data;
};
struct context {
ThrdQueue queue;
ThrdQueue signal;
};
static struct test_data anu[100];
static int
producer(void *udata)
{
for (int i = 0; i < 100; i++) {
anu[i].data = i;
tq_push(&((struct context *)udata)->queue, (Node *)&anu[i]);
printf("prod: %d\n", i);
}
puts("wait");
// wait
Node *s = tq_pop(&((struct context *)udata)->signal);
if (s != NULL)
free(s);
puts("end");
return 0;
}
static int
consumer(void *udata)
{
for (int i = 0; i < 100; i++) {
Node *t = tq_pop(&((struct context *)udata)->queue);
if (t != NULL)
printf("consm: %d\n", ((struct test_data *)t)->data);
usleep(10000);
}
Node *d = malloc(sizeof(Node));
if (d == NULL)
return 1;
tq_push(&((struct context *)udata)->signal, d);
return 0;
}
int
main(void)
{
struct context ctx;
thrd_t thrd1;
if (tq_init(&ctx.queue) < 0)
return 1;
if (tq_init(&ctx.signal) < 0)
return 1;
if (thrd_create(&thrd1, consumer, &ctx) != 0)
goto out;
(void)producer(&ctx);
thrd_join(thrd1, NULL);
out:
tq_deinit(&ctx.queue);
tq_deinit(&ctx.signal);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment