Skip to content

Instantly share code, notes, and snippets.

@jweinst1
Last active May 11, 2025 23:08
Show Gist options
  • Save jweinst1/d11b6360817d1b2fed24f9b7c4aa30b9 to your computer and use it in GitHub Desktop.
Save jweinst1/d11b6360817d1b2fed24f9b7c4aa30b9 to your computer and use it in GitHub Desktop.
Tailing implemented with Kqueue
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <stddef.h>
#include <limits.h>
#include <assert.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
struct char_buf {
size_t size;
char* buf;
};
void char_buf_init(struct char_buf* chb, size_t size) {
chb->size = size;
chb->buf = calloc(1, size);
}
void char_buf_check_size(struct char_buf* chb, size_t size) {
if ((size + 1) > chb->size) {
chb->buf = realloc(chb->buf, size + 400);
chb->size = size + 400;
}
memset(chb->buf, 0, chb->size);
}
void char_buf_deinit(struct char_buf* chb) {
free(chb->buf);
}
struct obj_buf {
size_t len;
size_t cap;
void** buf;
};
struct obj_buf* obj_buf_new(size_t cap) {
struct obj_buf* p = calloc(1, sizeof(struct obj_buf));
p->len = 0;
p->cap = cap;
p->buf = calloc(1, sizeof(void*) * cap);
return p;
}
void obj_buf_append(struct obj_buf* p, void* obj) {
if (p->len == p->cap) {
p->cap *= 2;
p->buf = realloc(p->buf, sizeof(void*) * p->cap);
}
p->buf[p->len++] = obj;
}
void obj_buf_remove(struct obj_buf* p, void* obj) {
size_t i = 0;
int found = 0;
for (; i < p->len; ++i)
{
if (p->buf[i] == obj) {
found = 1;
break;
}
}
if (!found) {
return;
}
p->buf[i] = NULL;
while (i < (p->len - 1)) {
p->buf[i] = p->buf[i + 1];
++i;
}
p->len--;
}
void obj_buf_delete(struct obj_buf* p) {
free(p->buf);
free(p);
}
void obj_buf_delete_all(struct obj_buf* p) {
// only for top level frees
for (size_t i = 0; i < p->len; ++i)
{
free(p->buf[i]);
}
free(p->buf);
free(p);
}
static char* str_dupl(const char* src) {
size_t src_size = strlen(src) + 1;
char* newstr = (char*)malloc(src_size);
memcpy(newstr, src, src_size);
return newstr;
}
int is_dot_or_dot_dot(const char* name) {
return name[0] == '.' && (name[1] == '.' || name[1] == '\0');
}
enum fs_entry {
FS_TYPE_INVALID,
FS_TYPE_FILE,
FS_TYPE_DIR
};
struct diriter {
DIR* dirp;
const char* dirpath;
char path[offsetof(struct dirent, d_name) + NAME_MAX + 1];
};
int diriter_begin(struct diriter* it, const char* dirpath) {
it->dirp = opendir(dirpath);
if (it->dirp == NULL) {
return 0;
}
it->dirpath = dirpath; // Not owning
return 1;
}
int diriter_next(struct diriter* it, char* buf, size_t bsize) {
if (it->dirp == NULL)
return 0;
struct dirent* result;
struct dirent* entry = (struct dirent*)it->path;
if(readdir_r(it->dirp, entry, &result) == 0 && result != NULL) {
if (is_dot_or_dot_dot(entry->d_name)) {
// this only happens twice per dir, so should be fine
return diriter_next(it, buf, bsize);
}
// todo, use strcpy to avoid mem alloc
int result = snprintf(buf, bsize, "%s/%s", it->dirpath, entry->d_name);
if (result < 0) {
fprintf(stderr, "Failed to iter on %s\n", it->dirpath);
closedir(it->dirp);
return 0;
}
return 1;
}
// now automatically close the dir
closedir(it->dirp);
return 0;
}
void print_directory(const char* path) {
struct diriter it;
char pbuf[512] = {'\0'};
diriter_begin(&it, path);
while (diriter_next(&it, pbuf, 512)) {
printf("%s\n", pbuf);
}
return;
}
int is_directory(const char *path) {
struct stat statbuf;
if (stat(path, &statbuf) != 0)
return 0;
return S_ISDIR(statbuf.st_mode);
}
int is_regular_file(const char *path) {
struct stat statbuf;
if (stat(path, &statbuf) != 0)
return 0;
return S_ISREG(statbuf.st_mode);
}
enum fs_entry is_file_or_dir(const char* path) {
struct stat statbuf;
if (stat(path, &statbuf) != 0)
return FS_TYPE_INVALID;
if (S_ISREG(statbuf.st_mode)) {
return FS_TYPE_FILE;
} else if (S_ISDIR(statbuf.st_mode)) {
return FS_TYPE_DIR;
}
return FS_TYPE_INVALID;
}
struct fs_node {
char* name;
int fd;
size_t offset;
struct kevent change;
struct kevent state;
struct obj_buf* children;
struct fs_node* parent;
};
int fs_node_contains_child(const struct fs_node* n, const char* fpath) {
for (size_t i = 0; i < n->children->len; ++i)
{
const struct fs_node* ch = n->children->buf[i];
if (strcmp(ch->name, fpath) == 0) {
return 1;
}
}
return 0;
}
int fs_node_shutdown(int kq_fd, struct fs_node* fnode) {
EV_SET(&fnode->change, fnode->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (kevent(kq_fd, &fnode->change, 1, NULL, 0, NULL) == -1) {
perror("kevent");
return 0;
}
EV_SET(&fnode->state, fnode->fd, EVFILT_VNODE, EV_DELETE, 0 , 0, NULL);
if (kevent(kq_fd, &fnode->state, 1, NULL, 0, NULL) == -1) {
perror("kevent");
return 0;
}
close(fnode->fd);
// remove from parent TODO
if (fnode->parent != NULL) {
obj_buf_remove(fnode->parent->children, fnode);
}
free(fnode->name);
free(fnode);
return 1;
}
struct fs_node* fs_node_create(int kq_fd, const char* fname, size_t* offset) {
printf("Adding %s to monitoring\n", fname);
struct fs_node* fnode = calloc(1, sizeof(struct fs_node));
enum fs_entry type = is_file_or_dir(fname);
if (type == FS_TYPE_FILE) {
fnode->offset = offset != NULL ? *offset : 0;
fnode->fd = open(fname, O_EVTONLY | O_RDONLY);
} else if (type == FS_TYPE_DIR) {
fnode->offset = 0;
fnode->fd = open(fname, O_EVTONLY);
} else {
free(fnode);
return NULL;
}
if (fnode->fd == -1) {
fprintf(stderr, "Cannot open %s\n", fname);
errno = 0;
free(fnode);
return NULL;
}
fnode->name = str_dupl(fname);
if (type == FS_TYPE_FILE) {
lseek(fnode->fd, fnode->offset, SEEK_SET);
// todo add other fd check for vnode removed / renamed
EV_SET(&fnode->change, fnode->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, fnode);
if (kevent(kq_fd, &fnode->change, 1, NULL, 0, NULL) == -1) {
perror("kevent");
return NULL;
}
EV_SET(&fnode->state, fnode->fd, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_RENAME | NOTE_DELETE , 0, fnode);
if (kevent(kq_fd, &fnode->state, 1, NULL, 0, NULL) == -1) {
perror("kevent");
return NULL;
}
} else if (type == FS_TYPE_DIR) {
fnode->children = obj_buf_new(20);
EV_SET(&fnode->state, fnode->fd, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_RENAME | NOTE_WRITE | NOTE_DELETE , 0, fnode);
if (kevent(kq_fd, &fnode->state, 1, NULL, 0, NULL) == -1) {
perror("kevent");
return NULL;
}
struct diriter it;
char pbuf[PATH_MAX + 10] = {'\0'};
if(!diriter_begin(&it, fname)) {
fprintf(stderr, "Cannot iterate on %s\n", fname);
free(fnode);
return NULL;
}
while (diriter_next(&it, pbuf, sizeof(pbuf))) {
struct fs_node* child_node = fs_node_create(kq_fd,pbuf,NULL);
if (child_node != NULL) {
printf("Adding %s as child to %s\n", pbuf, fname);
obj_buf_append(fnode->children, child_node);
child_node->parent = fnode;
}
}
}
printf("DONE Adding %s to monitoring\n", fname);
return fnode;
}
#define MAX_EVENTS 10
void fs_queue_wait(int kq_fd, struct char_buf* chb) {
struct kevent events[MAX_EVENTS];
memset(events, 0, sizeof(events));
int event_count = kevent(kq_fd, NULL, 0, events, MAX_EVENTS, NULL);
if (event_count == -1) {
perror("kevent");
return;
}
for (int i = 0; i < event_count; ++i) {
struct fs_node* triged = (struct fs_node*)events[i].udata;
if (triged->children != NULL) {
// this is a directory
if (events[i].fflags & NOTE_WRITE) {
// we need to check for new files
char pbuf[PATH_MAX + 10] = {'\0'};
struct diriter it;
diriter_begin(&it, triged->name);
while (diriter_next(&it, pbuf, sizeof(pbuf))) {
if (!fs_node_contains_child(triged, pbuf)) {
// this is a new child, we need to add it.
struct fs_node* newnode = fs_node_create(kq_fd, pbuf, NULL);
newnode->parent = triged;
obj_buf_append(triged->children, newnode);
}
}
}
if (events[i].fflags & NOTE_DELETE) {
// dir was deleted, but we should be getting separate events for children nodes
if(!fs_node_shutdown(kq_fd, triged)) {
fprintf(stderr, "Unable to shut down %p node\n", triged);
}
}
continue;
}
// these are files
if (events[i].filter == EVFILT_VNODE) {
if (events[i].fflags & NOTE_DELETE) {
if(!fs_node_shutdown(kq_fd, triged)) {
fprintf(stderr, "Unable to shut down %p node\n", triged);
}
}
if (events[i].fflags & NOTE_RENAME) {
puts("Got renamed!!!");
// todo here.......
}
continue;
}
// this has to be an EVFILT_READ
int bytes = events[i].data;
printf("%s was written to for %d bytes\n", triged->name, bytes);
if (bytes < 1) {
continue;
}
char_buf_check_size(chb, bytes);
ssize_t bytes_read = read(triged->fd, chb->buf, bytes);
if (bytes != bytes_read) {
fprintf(stderr, "Unable to read full bytes of %d\n", bytes);
}
printf("READ: '%s'\n", chb->buf);
triged->offset += bytes_read;
}
}
// tests begin here.
static void check_condition(int cond, const char* express, unsigned lineno) {
if (!cond) {
fprintf(stderr, "FAIL %s line %u\n", express, lineno);
}
}
#define CHECKIT(cond) check_condition(cond, #cond, __LINE__)
static void test_obj_buf_remove(void) {
int a = 3;
int b = 4;
int c = 5;
struct obj_buf* arr = obj_buf_new(20);
obj_buf_append(arr, &a);
obj_buf_append(arr, &b);
obj_buf_append(arr, &c);
CHECKIT(arr->len == 3);
obj_buf_remove(arr, &b);
CHECKIT(arr->len == 2);
CHECKIT(arr->buf[0] == &a);
CHECKIT(arr->buf[1] == &c);
obj_buf_delete(arr);
}
int main(int argc, char const *argv[])
{
test_obj_buf_remove();
struct char_buf chb;
char_buf_init(&chb, 1000);
int base = kqueue();
struct fs_node* created = fs_node_create(base, argv[1], NULL);
while (1) {
fs_queue_wait(base, &chb);
}
close(base);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment