Skip to content

Instantly share code, notes, and snippets.

@Prince781
Created March 17, 2021 00:35
Show Gist options
  • Save Prince781/71a271fb18ee457acf037fed6c8a4144 to your computer and use it in GitHub Desktop.
Save Prince781/71a271fb18ee457acf037fed6c8a4144 to your computer and use it in GitHub Desktop.
asynchronous event-driven I/O
#include <errno.h>
#include <fcntl.h>
#include <stddef.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#include <assert.h>
#include <poll.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdatomic.h>
#include <threads.h>
typedef struct _event event;
typedef struct _eventloop eventloop;
typedef void (*async_callback)(event *, void *data);
void eventloop_signal(eventloop *loop);
struct _event {
int fd;
struct { // flags
bool is_read_operation;
bool is_io_operation;
bool is_bg_operation; // whether this is an operation running in a background thread
atomic_bool is_cancelled;
atomic_bool is_ready; // can be set to avoid poll()ing
};
int io_errno; // relevant if the I/O operation failed
thrd_t thread; // reference to the background thread (if relevant)
async_callback callback;
void *callback_data;
eventloop *loop;
void *result;
struct _event *next;
};
event *event_new(async_callback callback, void *callback_data)
{
event *ev = calloc(1, sizeof *ev);
ev->fd = -1;
ev->callback = callback;
ev->callback_data = callback_data;
return ev;
}
event *event_new_from_fd(int fd, bool is_read_operation, async_callback callback, void *callback_data)
{
event *ev = calloc(1, sizeof *ev);
ev->fd = fd;
ev->is_read_operation = is_read_operation;
ev->is_io_operation = true;
ev->callback = callback;
ev->callback_data = callback_data;
return ev;
}
event *event_new_for_background(async_callback callback, void *callback_data)
{
event *ev = calloc(1, sizeof *ev);
ev->fd = -1;
ev->callback = callback;
ev->callback_data = callback_data;
ev->is_bg_operation = true;
return ev;
}
void event_return(event *ev, void *result)
{
assert(!ev->is_ready && "cannot complete an event twice!");
atomic_store_explicit(&ev->is_ready, true, memory_order_release);
ev->result = result;
if (ev->is_bg_operation)
eventloop_signal(ev->loop);
}
int event_get_errno(event *ev)
{
return ev->io_errno;
}
void event_cancel_with_errno(event *ev, int errnum)
{
atomic_store_explicit(&ev->is_cancelled, true, memory_order_release);
ev->io_errno = errnum;
}
void event_cancel(event *ev)
{
event_cancel_with_errno(ev, ECANCELED);
}
bool event_is_cancelled(event *ev)
{
return atomic_load_explicit(&ev->is_cancelled, memory_order_acquire);
}
bool event_is_ready(event *ev)
{
return atomic_load_explicit(&ev->is_ready, memory_order_acquire);
}
bool event_get_result(event *ev, void **pointer_ref)
{
if (event_is_cancelled(ev) || ev->io_errno != 0)
return false;
*pointer_ref = ev->result;
return true;
}
static inline void event_list_prepend(event **list, event *ev)
{
ev->next = *list;
*list = ev;
}
struct _eventloop {
event *pending_events;
event *pending_events_tail;
bool is_running;
/**
* A file descriptor to poll for when background (thread) tasks are ready.
* This is needed so that we can wait for tasks that are not I/O-bound
* without being busy.
*/
int bg_eventfd;
/**
* A file descriptor to write to, to signal that a background task may be
* ready.
*/
int bg_signalfd;
};
eventloop *eventloop_new(void)
{
eventloop *loop = calloc(1, sizeof *loop);
loop->is_running = true;
int fds[2];
if (pipe(fds) == -1) {
fprintf(stderr, "%s: failed to create event FD with pipe: %s\n",
__func__, strerror(errno));
abort();
}
loop->bg_eventfd = fds[0];
loop->bg_signalfd = fds[1];
if (fcntl(loop->bg_eventfd, F_SETFL, fcntl(loop->bg_eventfd, F_GETFL) | O_NONBLOCK) == -1 ||
fcntl(loop->bg_signalfd, F_SETFL, fcntl(loop->bg_signalfd, F_GETFL) | O_NONBLOCK)) {
fprintf(stderr, "%s: failed to set fds to non-blocking: %s\n",
__func__, strerror(errno));
abort();
}
return loop;
}
/**
* (to be used from a background thread)
*
* Signal the event loop that a background task may be ready.
*/
void eventloop_signal(eventloop *loop)
{
if (write(loop->bg_signalfd, "a", 1) == -1) {
fprintf(stderr, "%s: failed to signal event loop: %s\n",
__func__, strerror(errno));
abort();
}
}
void eventloop_add(eventloop *loop, event *ev)
{
if (!loop->pending_events) {
assert(!loop->pending_events_tail);
loop->pending_events = ev;
loop->pending_events_tail = ev;
} else {
assert(loop->pending_events_tail);
if (loop->pending_events == loop->pending_events_tail) {
loop->pending_events_tail = ev;
loop->pending_events->next = loop->pending_events_tail;
} else {
loop->pending_events_tail->next = ev;
loop->pending_events_tail = ev;
}
}
ev->loop = loop;
// fprintf(stderr, "[DEBUG] queued a new event\n");
}
void eventloop_remove(eventloop *loop, event *ev, event *prev)
{
if (!prev) {
if (loop->pending_events == loop->pending_events_tail)
loop->pending_events_tail = loop->pending_events->next;
loop->pending_events = loop->pending_events->next;
} else {
prev->next = ev->next;
}
if (ev == loop->pending_events_tail)
loop->pending_events_tail = prev;
ev->next = NULL;
}
/**
* Process all ready events
*/
bool eventloop_process(eventloop *loop)
{
event *ready_events = NULL;
unsigned num_io_pending = 0;
bool have_non_io_tasks = false;
for (event *pending = loop->pending_events,
*prev = NULL;
pending; ) {
event *next = pending->next;
if (event_is_ready(pending) || event_is_cancelled(pending)) {
// remove [pending] off the list of pending tasks and add it to
// the list of [ready_tasks]
eventloop_remove(loop, pending, prev);
event_list_prepend(&ready_events, pending);
pending = NULL;
} else if (pending->is_io_operation) {
num_io_pending++;
} else if (pending->is_bg_operation) {
have_non_io_tasks = true;
}
prev = pending;
pending = next;
}
if (num_io_pending + have_non_io_tasks > 0) {
struct pollfd *pollfds = calloc(num_io_pending + have_non_io_tasks, sizeof *pollfds);
unsigned p = 0;
for (event *pending = loop->pending_events; pending; pending = pending->next) {
if (pending->is_io_operation) {
pollfds[p].fd = pending->fd;
pollfds[p].events = pending->is_read_operation ? POLLIN : POLLOUT;
++p;
}
}
if (have_non_io_tasks) {
pollfds[p].fd = loop->bg_eventfd;
pollfds[p].events = POLLIN;
++p;
}
int poll_status = -1;
// poll without waiting (0) if there are other ready tasks
// otherwise, poll with an indefinite wait (-1)
int timeout_ms = ready_events ? 0 : -1;
while ((poll_status = poll(pollfds, num_io_pending + have_non_io_tasks, timeout_ms)) == -1 &&
(errno == EAGAIN || errno == EINTR))
;
// gather all I/O-ready tasks
p = 0;
for (event *pending = loop->pending_events,
*prev = NULL;
pending; ) {
event *next = pending->next;
if (pending->is_io_operation) {
if (pollfds[p].revents & (pending->is_read_operation ? POLLIN : POLLOUT)) {
// remove [pending] off the list of pending tasks and add
// it to the list of ready tasks
pending->is_ready = true;
eventloop_remove(loop, pending, prev);
event_list_prepend(&ready_events, pending);
pending = NULL;
} else if (pollfds[p].revents & (POLLHUP | POLLERR)) {
event_cancel_with_errno(pending, pollfds[p].revents & POLLHUP ? EPIPE : ECANCELED);
eventloop_remove(loop, pending, prev);
event_list_prepend(&ready_events, pending);
pending = NULL;
}
++p;
}
prev = pending;
pending = next;
}
if (have_non_io_tasks) {
if (pollfds[p].revents & POLLIN) {
// then search for all non-I/O background tasks that may be done
for (event *pending = loop->pending_events,
*prev = NULL;
pending; ) {
event *next = pending->next;
if (pending->is_bg_operation && event_is_ready(pending)) {
eventloop_remove(loop, pending, prev);
event_list_prepend(&ready_events, pending);
pending = NULL;
}
prev = pending;
pending = next;
}
// clear the buffer
char buffer[BUFSIZ];
while (read(pollfds[p].fd, &buffer, sizeof buffer) == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK))
;
}
}
free(pollfds);
}
while (ready_events) {
event *ev = ready_events;
ready_events = ready_events->next;
ev->next = NULL;
// this is the last time the main loop knows about this task
// it's the responsibility of the callback to free it
ev->callback(ev, ev->callback_data);
// fprintf(stderr, "[DEBUG] processed and removed an event\n");
}
// if there are no more events, then the event loop does not need to run
if (!loop->pending_events)
loop->is_running = false;
return loop->is_running;
}
void eventloop_destroy(eventloop *loop)
{
loop->is_running = false;
while (loop->pending_events) {
event *ev = loop->pending_events;
eventloop_remove(loop, loop->pending_events, NULL);
free(ev);
}
close(loop->bg_eventfd);
close(loop->bg_signalfd);
free(loop);
}
typedef struct {
FILE *file;
char *buffer;
size_t buffer_size;
size_t content_length; // not including the trailing NUL byte
unsigned unbalanced_lparens;
event *parse_ev;
} parse_balanced_parens_state;
void parse_balanced_parens_state_free(parse_balanced_parens_state *state, bool free_buffer)
{
state->parse_ev = NULL;
if (free_buffer)
free(state->buffer);
free(state);
}
static void parse_balanced_parens_async_cb(event *read_ev, void *user_data)
{
parse_balanced_parens_state *state = user_data;
if (!event_is_ready(read_ev)) {
// an error occurred, so we return
fprintf(stderr, "[DEBUG] read buffer error: %s\n", strerror(event_get_errno(read_ev)));
event_cancel_with_errno(state->parse_ev, event_get_errno(read_ev));
free(read_ev);
return;
}
// we no longer care about this task
free(read_ev);
char c = fgetc(state->file);
// fprintf(stderr, "[DEBUG] read character %#hhx ( '%c' )\n", c, isprint(c) ? c : '\0');
if (c == EOF) {
if (state->unbalanced_lparens > 0) {
// error
fprintf(stderr, "[DEBUG] parse error: unexpected end of file\n");
event_cancel(state->parse_ev);
}
} else if (c == '(') {
state->unbalanced_lparens++;
} else if (c == ')') {
if (state->unbalanced_lparens == 0) {
// error
fprintf(stderr, "[DEBUG] parse error: unexpected ')'\n");
event_cancel(state->parse_ev);
} else {
state->unbalanced_lparens--;
}
}
if (c == '(' || c == ')') {
if (state->content_length + 1 >= state->buffer_size) {
size_t buffer_size = state->buffer_size ? (state->buffer_size * 2) : 16;
char *buffer = realloc(state->buffer, buffer_size);
if (!buffer) {
fprintf(stderr, "[DEBUG] parse error: failed to allocate buffer: %s\n",
strerror(errno));
event_cancel(state->parse_ev);
} else {
state->buffer = buffer;
state->buffer_size = buffer_size;
}
}
state->buffer[state->content_length++] = c;
state->buffer[state->content_length] = '\0';
}
if (state->unbalanced_lparens == 0) {
// we're done
event_return(state->parse_ev, state->buffer);
parse_balanced_parens_state_free(state, false);
} else if (state->parse_ev->is_cancelled) {
// destroy the state info as we've been cancelled
parse_balanced_parens_state_free(state, true);
} else {
// we need to parse more
// add a new event for the next read task
eventloop_add(state->parse_ev->loop,
event_new_from_fd(fileno(state->file), true, parse_balanced_parens_async_cb, state));
}
}
static void parse_balanced_parens_async(FILE *file,
eventloop *loop,
async_callback callback,
void *callback_data)
{
parse_balanced_parens_state *state = calloc(1, sizeof *state);
state->file = file;
// this is the event representing the state of the parsing task
state->parse_ev = event_new(callback, callback_data);
eventloop_add(loop, state->parse_ev);
// this is the event representing the state of the read task
event *read_ev = event_new_from_fd(fileno(file), true, parse_balanced_parens_async_cb, state);
eventloop_add(loop, read_ev);
}
static char *parse_balanced_parens_finish(event *ev, int *error)
{
if (ev->io_errno != 0) {
if (error)
*error = ev->io_errno;
free(ev);
return NULL;
}
char *result = ev->result;
free(ev); // free the task
return result;
}
static void on_parse_balanced_parens(event *parse_ev, void *user_data)
{
int errnum = 0;
char *result = parse_balanced_parens_finish(parse_ev, &errnum);
if (errnum) {
fprintf(stderr, "[DEBUG] parse error: %s\n", strerror(errnum));
} else {
printf("done parsing balanced parentheses: %s\n", result);
}
free(result);
}
typedef struct {
unsigned n;
int *values;
event *ev;
} sort_state;
static int sort_begin(void *user_data)
{
sort_state *state = user_data;
unsigned operations = 0;
for (unsigned i = 0; i < state->n; i++) {
for (unsigned j = i + 1; j < state->n; j++) {
if (state->values[i] > state->values[j]) {
int temp = state->values[i];
state->values[i] = state->values[j];
state->values[j] = temp;
}
operations++;
}
}
event_return(state->ev, (void *) (intptr_t) operations);
free(state);
return 0;
}
static unsigned sort_finish(event *ev, int *error)
{
thrd_join(ev->thread, NULL);
if (ev->io_errno != 0) {
if (error)
*error = ev->io_errno;
free(ev);
return 0;
}
unsigned operations = (unsigned) (intptr_t) ev->result;
free(ev);
return operations;
}
// perform expensive background computation
static void sort_async(unsigned n,
int *values,
eventloop *loop,
async_callback callback,
void *callback_data)
{
event *ev = event_new_for_background(callback, callback_data);
eventloop_add(loop, ev);
sort_state *state = calloc(1, sizeof *state);
state->n = n;
state->values = values;
state->ev = ev;
if (thrd_create(&ev->thread, sort_begin, state) != thrd_success) {
perror("thrd_create");
abort();
}
}
static void on_sort(event *sort_ev, void *user_data)
{
int errnum = 0;
unsigned operations = sort_finish(sort_ev, &errnum);
const sort_state *state = user_data;
bool is_sorted = true;
for (unsigned i = 1; i < state->n; i++) {
if (state->values[i - 1] > state->values[i]) {
is_sorted = false;
break;
}
}
if (errnum) {
fprintf(stderr, "[DEBUG] sort error: %s\n", strerror(errnum));
} else if (!is_sorted) {
fprintf(stderr, "[DEBUG] sort algorithm returned incorrect result\n");
} else {
printf("done sorting in %u operations\n", operations);
}
free(state->values);
}
int main(int argc, char *argv[])
{
if (fcntl(STDIN_FILENO, F_SETFL, fcntl(STDIN_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
perror("fcntl() on stdin");
return 1;
}
if (setvbuf(stdin, NULL, _IONBF, 0) != 0) {
perror("setvbuf");
return 1;
}
eventloop *loop = eventloop_new();
// add some tasks
// 1. parsing
parse_balanced_parens_async(stdin, loop, on_parse_balanced_parens, NULL);
// 2. sorting
unsigned num_values = 1024 * 32;
int *values = malloc(num_values * sizeof(*values));
if (!values) {
perror("malloc");
abort();
}
for (unsigned i = 0; i < num_values; i++)
values[i] = rand() % num_values;
sort_async(num_values, values, loop, on_sort, &(sort_state) { .n = num_values, .values = values });
while (eventloop_process(loop))
/* process events until there are no more */;
eventloop_destroy(loop);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment