Skip to content

Instantly share code, notes, and snippets.

@jweinst1
Created May 12, 2025 07:27
Show Gist options
  • Save jweinst1/5dc80b1dd0831becfff206e8b8fc7c08 to your computer and use it in GitHub Desktop.
Save jweinst1/5dc80b1dd0831becfff206e8b8fc7c08 to your computer and use it in GitHub Desktop.
a disk backed message queue in C with kqueue
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <stddef.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/mman.h>
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <dirent.h>
#include <limits.h>
static const char* MQUEUE_EXT = ".mqueue";
static const char* MQUEUE_DEFAULT_FILE = "0.mqueue";
struct obj_buf {
size_t len;
size_t cap;
void** buf;
};
struct obj_buf* obj_buf_new(size_t cap) {
struct obj_buf* p = calloc(1, sizeof(struct obj_buf));
p->len = 0;
p->cap = cap;
p->buf = calloc(1, sizeof(void*) * cap);
return p;
}
void obj_buf_append(struct obj_buf* p, void* obj) {
if (p->len == p->cap) {
p->cap *= 2;
p->buf = realloc(p->buf, sizeof(void*) * p->cap);
}
p->buf[p->len++] = obj;
}
void obj_buf_remove(struct obj_buf* p, void* obj) {
size_t i = 0;
int found = 0;
for (; i < p->len; ++i)
{
if (p->buf[i] == obj) {
found = 1;
break;
}
}
if (!found) {
return;
}
p->buf[i] = NULL;
while (i < (p->len - 1)) {
p->buf[i] = p->buf[i + 1];
++i;
}
p->len--;
}
void obj_buf_delete(struct obj_buf* p) {
free(p->buf);
free(p);
}
void obj_buf_delete_all(struct obj_buf* p) {
// only for top level frees
for (size_t i = 0; i < p->len; ++i)
{
free(p->buf[i]);
}
free(p->buf);
free(p);
}
int is_dot_or_dot_dot(const char* name) {
return name[0] == '.' && (name[1] == '.' || name[1] == '\0');
}
struct diriter {
DIR* dirp;
const char* dirpath;
char path[offsetof(struct dirent, d_name) + NAME_MAX + 1];
};
int diriter_begin(struct diriter* it, const char* dirpath) {
it->dirp = opendir(dirpath);
if (it->dirp == NULL) {
return 0;
}
it->dirpath = dirpath; // Not owning
return 1;
}
int diriter_next(struct diriter* it, char* buf, size_t bsize) {
if (it->dirp == NULL)
return 0;
struct dirent* result;
struct dirent* entry = (struct dirent*)it->path;
if(readdir_r(it->dirp, entry, &result) == 0 && result != NULL) {
if (is_dot_or_dot_dot(entry->d_name)) {
// this only happens twice per dir, so should be fine
return diriter_next(it, buf, bsize);
}
// todo, use strcpy to avoid mem alloc
int result = snprintf(buf, bsize, "%s/%s", it->dirpath, entry->d_name);
if (result < 0) {
fprintf(stderr, "Failed to iter on %s\n", it->dirpath);
closedir(it->dirp);
return 0;
}
return 1;
}
// now automatically close the dir
closedir(it->dirp);
return 0;
}
static char* str_dupl(const char* src) {
size_t src_size = strlen(src) + 1;
char* newstr = (char*)malloc(src_size);
memcpy(newstr, src, src_size);
return newstr;
}
struct mqueue {
char* dirname;
int dirfd;
int kqfd;
struct obj_buf* qfiles;
size_t max_qsize;
void* cur_file;
int writerfd;
size_t writeroff;
};
struct mq_file {
struct mqueue* queue;
struct kevent change;
char* fname;
int fd;
long file_no;
size_t offset;
};
struct mq_file* mq_file_add(struct mqueue* mq, const char* fname, size_t offset) {
char* end = NULL;
int is_actually_zero = fname[0] == '0' && fname[1] == '.';
long file_no = strtol(fname, &end, 10);
if (!is_actually_zero && file_no == 0) {
// File is not right format.
return NULL;
}
if(!(strcmp(end, MQUEUE_EXT) == 0)) {
return NULL;
}
struct mq_file* mqf = calloc(1, sizeof(struct mq_file));
mqf->queue = mq;
mqf->fname = str_dupl(fname);
mqf->fd = open(mqf->fname, O_EVTONLY | O_RDONLY);
if (mqf->fd == -1) {
free(mqf->fname);
free(mqf);
return NULL;
}
mqf->file_no = file_no;
mqf->offset = offset; // todo, real offset store
lseek(mqf->fd, mqf->offset, SEEK_SET);
EV_SET(&mqf->change, mqf->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, mqf);
if (kevent(mq->kqfd, &mqf->change, 1, NULL, 0, NULL) == -1) {
perror("kevent");
close(mqf->fd);
free(mqf->fname);
free(mqf);
return NULL;
}
obj_buf_append(mq->qfiles, mqf);
return mqf;
}
int mqueue_init(struct mqueue* mq, const char* dirname, size_t max_qsize) {
memset(mq, 0, sizeof(struct mqueue));
mq->cur_file = NULL;
mq->dirname = str_dupl(dirname);
mq->dirfd = open(mq->dirname, O_EVTONLY);
mq->writeroff = 0;
if (mq->dirfd == -1) {
free(mq->dirname);
return 0;
}
mq->qfiles = obj_buf_new(50);
struct diriter it;
char pbuf[PATH_MAX + 30] = {'\0'};
diriter_begin(&it, dirname);
while (diriter_next(&it, pbuf, sizeof(pbuf))) {
printf("Found %s in queue dir %s\n", pbuf, dirname);
struct mq_file* mqf = mq_file_add(mq, pbuf, 0);
if (mqf != NULL && mqf->file_no > ((struct mq_file*)mq->cur_file)->file_no) {
mq->cur_file = mqf;
}
}
// create a new file if one doesn't exist
if (mq->cur_file == NULL) {
char pbuf[PATH_MAX + 50] = {'\0'};
int newfd = open(MQUEUE_DEFAULT_FILE, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (newfd == -1) {
return 0;
}
mq->writerfd = newfd;
struct mq_file* mqf = mq_file_add(mq, MQUEUE_DEFAULT_FILE, 0);
} else {
int newfd = open(((struct mq_file*)mq->cur_file)->fname, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (newfd == -1) {
return 0;
}
mq->writerfd = newfd;
}
return 1;
}
// tests begin here.
static void check_condition(int cond, const char* express, unsigned lineno) {
if (!cond) {
fprintf(stderr, "FAIL %s line %u\n", express, lineno);
}
}
#define CHECKIT(cond) check_condition(cond, #cond, __LINE__)
int main(int argc, char const *argv[])
{
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment