Created
May 12, 2025 07:27
-
-
Save jweinst1/5dc80b1dd0831becfff206e8b8fc7c08 to your computer and use it in GitHub Desktop.
a disk backed message queue in C with kqueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <stdint.h> | |
#include <stddef.h> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <string.h> | |
#include <sys/mman.h> | |
#include <errno.h> | |
#include <inttypes.h> | |
#include <pthread.h> | |
#include <sys/event.h> | |
#include <sys/time.h> | |
#include <sys/stat.h> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <dirent.h> | |
#include <limits.h> | |
static const char* MQUEUE_EXT = ".mqueue"; | |
static const char* MQUEUE_DEFAULT_FILE = "0.mqueue"; | |
struct obj_buf { | |
size_t len; | |
size_t cap; | |
void** buf; | |
}; | |
struct obj_buf* obj_buf_new(size_t cap) { | |
struct obj_buf* p = calloc(1, sizeof(struct obj_buf)); | |
p->len = 0; | |
p->cap = cap; | |
p->buf = calloc(1, sizeof(void*) * cap); | |
return p; | |
} | |
void obj_buf_append(struct obj_buf* p, void* obj) { | |
if (p->len == p->cap) { | |
p->cap *= 2; | |
p->buf = realloc(p->buf, sizeof(void*) * p->cap); | |
} | |
p->buf[p->len++] = obj; | |
} | |
void obj_buf_remove(struct obj_buf* p, void* obj) { | |
size_t i = 0; | |
int found = 0; | |
for (; i < p->len; ++i) | |
{ | |
if (p->buf[i] == obj) { | |
found = 1; | |
break; | |
} | |
} | |
if (!found) { | |
return; | |
} | |
p->buf[i] = NULL; | |
while (i < (p->len - 1)) { | |
p->buf[i] = p->buf[i + 1]; | |
++i; | |
} | |
p->len--; | |
} | |
void obj_buf_delete(struct obj_buf* p) { | |
free(p->buf); | |
free(p); | |
} | |
void obj_buf_delete_all(struct obj_buf* p) { | |
// only for top level frees | |
for (size_t i = 0; i < p->len; ++i) | |
{ | |
free(p->buf[i]); | |
} | |
free(p->buf); | |
free(p); | |
} | |
int is_dot_or_dot_dot(const char* name) { | |
return name[0] == '.' && (name[1] == '.' || name[1] == '\0'); | |
} | |
struct diriter { | |
DIR* dirp; | |
const char* dirpath; | |
char path[offsetof(struct dirent, d_name) + NAME_MAX + 1]; | |
}; | |
int diriter_begin(struct diriter* it, const char* dirpath) { | |
it->dirp = opendir(dirpath); | |
if (it->dirp == NULL) { | |
return 0; | |
} | |
it->dirpath = dirpath; // Not owning | |
return 1; | |
} | |
int diriter_next(struct diriter* it, char* buf, size_t bsize) { | |
if (it->dirp == NULL) | |
return 0; | |
struct dirent* result; | |
struct dirent* entry = (struct dirent*)it->path; | |
if(readdir_r(it->dirp, entry, &result) == 0 && result != NULL) { | |
if (is_dot_or_dot_dot(entry->d_name)) { | |
// this only happens twice per dir, so should be fine | |
return diriter_next(it, buf, bsize); | |
} | |
// todo, use strcpy to avoid mem alloc | |
int result = snprintf(buf, bsize, "%s/%s", it->dirpath, entry->d_name); | |
if (result < 0) { | |
fprintf(stderr, "Failed to iter on %s\n", it->dirpath); | |
closedir(it->dirp); | |
return 0; | |
} | |
return 1; | |
} | |
// now automatically close the dir | |
closedir(it->dirp); | |
return 0; | |
} | |
static char* str_dupl(const char* src) { | |
size_t src_size = strlen(src) + 1; | |
char* newstr = (char*)malloc(src_size); | |
memcpy(newstr, src, src_size); | |
return newstr; | |
} | |
struct mqueue { | |
char* dirname; | |
int dirfd; | |
int kqfd; | |
struct obj_buf* qfiles; | |
size_t max_qsize; | |
void* cur_file; | |
int writerfd; | |
size_t writeroff; | |
}; | |
struct mq_file { | |
struct mqueue* queue; | |
struct kevent change; | |
char* fname; | |
int fd; | |
long file_no; | |
size_t offset; | |
}; | |
struct mq_file* mq_file_add(struct mqueue* mq, const char* fname, size_t offset) { | |
char* end = NULL; | |
int is_actually_zero = fname[0] == '0' && fname[1] == '.'; | |
long file_no = strtol(fname, &end, 10); | |
if (!is_actually_zero && file_no == 0) { | |
// File is not right format. | |
return NULL; | |
} | |
if(!(strcmp(end, MQUEUE_EXT) == 0)) { | |
return NULL; | |
} | |
struct mq_file* mqf = calloc(1, sizeof(struct mq_file)); | |
mqf->queue = mq; | |
mqf->fname = str_dupl(fname); | |
mqf->fd = open(mqf->fname, O_EVTONLY | O_RDONLY); | |
if (mqf->fd == -1) { | |
free(mqf->fname); | |
free(mqf); | |
return NULL; | |
} | |
mqf->file_no = file_no; | |
mqf->offset = offset; // todo, real offset store | |
lseek(mqf->fd, mqf->offset, SEEK_SET); | |
EV_SET(&mqf->change, mqf->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, mqf); | |
if (kevent(mq->kqfd, &mqf->change, 1, NULL, 0, NULL) == -1) { | |
perror("kevent"); | |
close(mqf->fd); | |
free(mqf->fname); | |
free(mqf); | |
return NULL; | |
} | |
obj_buf_append(mq->qfiles, mqf); | |
return mqf; | |
} | |
int mqueue_init(struct mqueue* mq, const char* dirname, size_t max_qsize) { | |
memset(mq, 0, sizeof(struct mqueue)); | |
mq->cur_file = NULL; | |
mq->dirname = str_dupl(dirname); | |
mq->dirfd = open(mq->dirname, O_EVTONLY); | |
mq->writeroff = 0; | |
if (mq->dirfd == -1) { | |
free(mq->dirname); | |
return 0; | |
} | |
mq->qfiles = obj_buf_new(50); | |
struct diriter it; | |
char pbuf[PATH_MAX + 30] = {'\0'}; | |
diriter_begin(&it, dirname); | |
while (diriter_next(&it, pbuf, sizeof(pbuf))) { | |
printf("Found %s in queue dir %s\n", pbuf, dirname); | |
struct mq_file* mqf = mq_file_add(mq, pbuf, 0); | |
if (mqf != NULL && mqf->file_no > ((struct mq_file*)mq->cur_file)->file_no) { | |
mq->cur_file = mqf; | |
} | |
} | |
// create a new file if one doesn't exist | |
if (mq->cur_file == NULL) { | |
char pbuf[PATH_MAX + 50] = {'\0'}; | |
int newfd = open(MQUEUE_DEFAULT_FILE, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); | |
if (newfd == -1) { | |
return 0; | |
} | |
mq->writerfd = newfd; | |
struct mq_file* mqf = mq_file_add(mq, MQUEUE_DEFAULT_FILE, 0); | |
} else { | |
int newfd = open(((struct mq_file*)mq->cur_file)->fname, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); | |
if (newfd == -1) { | |
return 0; | |
} | |
mq->writerfd = newfd; | |
} | |
return 1; | |
} | |
// tests begin here. | |
static void check_condition(int cond, const char* express, unsigned lineno) { | |
if (!cond) { | |
fprintf(stderr, "FAIL %s line %u\n", express, lineno); | |
} | |
} | |
#define CHECKIT(cond) check_condition(cond, #cond, __LINE__) | |
int main(int argc, char const *argv[]) | |
{ | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment