Created
March 17, 2021 00:35
-
-
Save Prince781/71a271fb18ee457acf037fed6c8a4144 to your computer and use it in GitHub Desktop.
asynchronous event-driven I/O
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <errno.h> | |
| #include <fcntl.h> | |
| #include <stddef.h> | |
| #include <stdio.h> | |
| #include <stdbool.h> | |
| #include <string.h> | |
| #include <stdlib.h> | |
| #include <ctype.h> | |
| #include <assert.h> | |
| #include <poll.h> | |
| #include <unistd.h> | |
| #include <sys/types.h> | |
| #include <sys/socket.h> | |
| #include <stdatomic.h> | |
| #include <threads.h> | |
| typedef struct _event event; | |
| typedef struct _eventloop eventloop; | |
| typedef void (*async_callback)(event *, void *data); | |
| void eventloop_signal(eventloop *loop); | |
| struct _event { | |
| int fd; | |
| struct { // flags | |
| bool is_read_operation; | |
| bool is_io_operation; | |
| bool is_bg_operation; // whether this is an operation running in a background thread | |
| atomic_bool is_cancelled; | |
| atomic_bool is_ready; // can be set to avoid poll()ing | |
| }; | |
| int io_errno; // relevant if the I/O operation failed | |
| thrd_t thread; // reference to the background thread (if relevant) | |
| async_callback callback; | |
| void *callback_data; | |
| eventloop *loop; | |
| void *result; | |
| struct _event *next; | |
| }; | |
| event *event_new(async_callback callback, void *callback_data) | |
| { | |
| event *ev = calloc(1, sizeof *ev); | |
| ev->fd = -1; | |
| ev->callback = callback; | |
| ev->callback_data = callback_data; | |
| return ev; | |
| } | |
| event *event_new_from_fd(int fd, bool is_read_operation, async_callback callback, void *callback_data) | |
| { | |
| event *ev = calloc(1, sizeof *ev); | |
| ev->fd = fd; | |
| ev->is_read_operation = is_read_operation; | |
| ev->is_io_operation = true; | |
| ev->callback = callback; | |
| ev->callback_data = callback_data; | |
| return ev; | |
| } | |
| event *event_new_for_background(async_callback callback, void *callback_data) | |
| { | |
| event *ev = calloc(1, sizeof *ev); | |
| ev->fd = -1; | |
| ev->callback = callback; | |
| ev->callback_data = callback_data; | |
| ev->is_bg_operation = true; | |
| return ev; | |
| } | |
| void event_return(event *ev, void *result) | |
| { | |
| assert(!ev->is_ready && "cannot complete an event twice!"); | |
| atomic_store_explicit(&ev->is_ready, true, memory_order_release); | |
| ev->result = result; | |
| if (ev->is_bg_operation) | |
| eventloop_signal(ev->loop); | |
| } | |
| int event_get_errno(event *ev) | |
| { | |
| return ev->io_errno; | |
| } | |
| void event_cancel_with_errno(event *ev, int errnum) | |
| { | |
| atomic_store_explicit(&ev->is_cancelled, true, memory_order_release); | |
| ev->io_errno = errnum; | |
| } | |
| void event_cancel(event *ev) | |
| { | |
| event_cancel_with_errno(ev, ECANCELED); | |
| } | |
| bool event_is_cancelled(event *ev) | |
| { | |
| return atomic_load_explicit(&ev->is_cancelled, memory_order_acquire); | |
| } | |
| bool event_is_ready(event *ev) | |
| { | |
| return atomic_load_explicit(&ev->is_ready, memory_order_acquire); | |
| } | |
| bool event_get_result(event *ev, void **pointer_ref) | |
| { | |
| if (event_is_cancelled(ev) || ev->io_errno != 0) | |
| return false; | |
| *pointer_ref = ev->result; | |
| return true; | |
| } | |
| static inline void event_list_prepend(event **list, event *ev) | |
| { | |
| ev->next = *list; | |
| *list = ev; | |
| } | |
| struct _eventloop { | |
| event *pending_events; | |
| event *pending_events_tail; | |
| bool is_running; | |
| /** | |
| * A file descriptor to poll for when background (thread) tasks are ready. | |
| * This is needed so that we can wait for tasks that are not I/O-bound | |
| * without being busy. | |
| */ | |
| int bg_eventfd; | |
| /** | |
| * A file descriptor to write to, to signal that a background task may be | |
| * ready. | |
| */ | |
| int bg_signalfd; | |
| }; | |
| eventloop *eventloop_new(void) | |
| { | |
| eventloop *loop = calloc(1, sizeof *loop); | |
| loop->is_running = true; | |
| int fds[2]; | |
| if (pipe(fds) == -1) { | |
| fprintf(stderr, "%s: failed to create event FD with pipe: %s\n", | |
| __func__, strerror(errno)); | |
| abort(); | |
| } | |
| loop->bg_eventfd = fds[0]; | |
| loop->bg_signalfd = fds[1]; | |
| if (fcntl(loop->bg_eventfd, F_SETFL, fcntl(loop->bg_eventfd, F_GETFL) | O_NONBLOCK) == -1 || | |
| fcntl(loop->bg_signalfd, F_SETFL, fcntl(loop->bg_signalfd, F_GETFL) | O_NONBLOCK)) { | |
| fprintf(stderr, "%s: failed to set fds to non-blocking: %s\n", | |
| __func__, strerror(errno)); | |
| abort(); | |
| } | |
| return loop; | |
| } | |
| /** | |
| * (to be used from a background thread) | |
| * | |
| * Signal the event loop that a background task may be ready. | |
| */ | |
| void eventloop_signal(eventloop *loop) | |
| { | |
| if (write(loop->bg_signalfd, "a", 1) == -1) { | |
| fprintf(stderr, "%s: failed to signal event loop: %s\n", | |
| __func__, strerror(errno)); | |
| abort(); | |
| } | |
| } | |
| void eventloop_add(eventloop *loop, event *ev) | |
| { | |
| if (!loop->pending_events) { | |
| assert(!loop->pending_events_tail); | |
| loop->pending_events = ev; | |
| loop->pending_events_tail = ev; | |
| } else { | |
| assert(loop->pending_events_tail); | |
| if (loop->pending_events == loop->pending_events_tail) { | |
| loop->pending_events_tail = ev; | |
| loop->pending_events->next = loop->pending_events_tail; | |
| } else { | |
| loop->pending_events_tail->next = ev; | |
| loop->pending_events_tail = ev; | |
| } | |
| } | |
| ev->loop = loop; | |
| // fprintf(stderr, "[DEBUG] queued a new event\n"); | |
| } | |
| void eventloop_remove(eventloop *loop, event *ev, event *prev) | |
| { | |
| if (!prev) { | |
| if (loop->pending_events == loop->pending_events_tail) | |
| loop->pending_events_tail = loop->pending_events->next; | |
| loop->pending_events = loop->pending_events->next; | |
| } else { | |
| prev->next = ev->next; | |
| } | |
| if (ev == loop->pending_events_tail) | |
| loop->pending_events_tail = prev; | |
| ev->next = NULL; | |
| } | |
| /** | |
| * Process all ready events | |
| */ | |
| bool eventloop_process(eventloop *loop) | |
| { | |
| event *ready_events = NULL; | |
| unsigned num_io_pending = 0; | |
| bool have_non_io_tasks = false; | |
| for (event *pending = loop->pending_events, | |
| *prev = NULL; | |
| pending; ) { | |
| event *next = pending->next; | |
| if (event_is_ready(pending) || event_is_cancelled(pending)) { | |
| // remove [pending] off the list of pending tasks and add it to | |
| // the list of [ready_tasks] | |
| eventloop_remove(loop, pending, prev); | |
| event_list_prepend(&ready_events, pending); | |
| pending = NULL; | |
| } else if (pending->is_io_operation) { | |
| num_io_pending++; | |
| } else if (pending->is_bg_operation) { | |
| have_non_io_tasks = true; | |
| } | |
| prev = pending; | |
| pending = next; | |
| } | |
| if (num_io_pending + have_non_io_tasks > 0) { | |
| struct pollfd *pollfds = calloc(num_io_pending + have_non_io_tasks, sizeof *pollfds); | |
| unsigned p = 0; | |
| for (event *pending = loop->pending_events; pending; pending = pending->next) { | |
| if (pending->is_io_operation) { | |
| pollfds[p].fd = pending->fd; | |
| pollfds[p].events = pending->is_read_operation ? POLLIN : POLLOUT; | |
| ++p; | |
| } | |
| } | |
| if (have_non_io_tasks) { | |
| pollfds[p].fd = loop->bg_eventfd; | |
| pollfds[p].events = POLLIN; | |
| ++p; | |
| } | |
| int poll_status = -1; | |
| // poll without waiting (0) if there are other ready tasks | |
| // otherwise, poll with an indefinite wait (-1) | |
| int timeout_ms = ready_events ? 0 : -1; | |
| while ((poll_status = poll(pollfds, num_io_pending + have_non_io_tasks, timeout_ms)) == -1 && | |
| (errno == EAGAIN || errno == EINTR)) | |
| ; | |
| // gather all I/O-ready tasks | |
| p = 0; | |
| for (event *pending = loop->pending_events, | |
| *prev = NULL; | |
| pending; ) { | |
| event *next = pending->next; | |
| if (pending->is_io_operation) { | |
| if (pollfds[p].revents & (pending->is_read_operation ? POLLIN : POLLOUT)) { | |
| // remove [pending] off the list of pending tasks and add | |
| // it to the list of ready tasks | |
| pending->is_ready = true; | |
| eventloop_remove(loop, pending, prev); | |
| event_list_prepend(&ready_events, pending); | |
| pending = NULL; | |
| } else if (pollfds[p].revents & (POLLHUP | POLLERR)) { | |
| event_cancel_with_errno(pending, pollfds[p].revents & POLLHUP ? EPIPE : ECANCELED); | |
| eventloop_remove(loop, pending, prev); | |
| event_list_prepend(&ready_events, pending); | |
| pending = NULL; | |
| } | |
| ++p; | |
| } | |
| prev = pending; | |
| pending = next; | |
| } | |
| if (have_non_io_tasks) { | |
| if (pollfds[p].revents & POLLIN) { | |
| // then search for all non-I/O background tasks that may be done | |
| for (event *pending = loop->pending_events, | |
| *prev = NULL; | |
| pending; ) { | |
| event *next = pending->next; | |
| if (pending->is_bg_operation && event_is_ready(pending)) { | |
| eventloop_remove(loop, pending, prev); | |
| event_list_prepend(&ready_events, pending); | |
| pending = NULL; | |
| } | |
| prev = pending; | |
| pending = next; | |
| } | |
| // clear the buffer | |
| char buffer[BUFSIZ]; | |
| while (read(pollfds[p].fd, &buffer, sizeof buffer) == -1 && | |
| (errno == EAGAIN || errno == EWOULDBLOCK)) | |
| ; | |
| } | |
| } | |
| free(pollfds); | |
| } | |
| while (ready_events) { | |
| event *ev = ready_events; | |
| ready_events = ready_events->next; | |
| ev->next = NULL; | |
| // this is the last time the main loop knows about this task | |
| // it's the responsibility of the callback to free it | |
| ev->callback(ev, ev->callback_data); | |
| // fprintf(stderr, "[DEBUG] processed and removed an event\n"); | |
| } | |
| // if there are no more events, then the event loop does not need to run | |
| if (!loop->pending_events) | |
| loop->is_running = false; | |
| return loop->is_running; | |
| } | |
| void eventloop_destroy(eventloop *loop) | |
| { | |
| loop->is_running = false; | |
| while (loop->pending_events) { | |
| event *ev = loop->pending_events; | |
| eventloop_remove(loop, loop->pending_events, NULL); | |
| free(ev); | |
| } | |
| close(loop->bg_eventfd); | |
| close(loop->bg_signalfd); | |
| free(loop); | |
| } | |
| typedef struct { | |
| FILE *file; | |
| char *buffer; | |
| size_t buffer_size; | |
| size_t content_length; // not including the trailing NUL byte | |
| unsigned unbalanced_lparens; | |
| event *parse_ev; | |
| } parse_balanced_parens_state; | |
| void parse_balanced_parens_state_free(parse_balanced_parens_state *state, bool free_buffer) | |
| { | |
| state->parse_ev = NULL; | |
| if (free_buffer) | |
| free(state->buffer); | |
| free(state); | |
| } | |
| static void parse_balanced_parens_async_cb(event *read_ev, void *user_data) | |
| { | |
| parse_balanced_parens_state *state = user_data; | |
| if (!event_is_ready(read_ev)) { | |
| // an error occurred, so we return | |
| fprintf(stderr, "[DEBUG] read buffer error: %s\n", strerror(event_get_errno(read_ev))); | |
| event_cancel_with_errno(state->parse_ev, event_get_errno(read_ev)); | |
| free(read_ev); | |
| return; | |
| } | |
| // we no longer care about this task | |
| free(read_ev); | |
| char c = fgetc(state->file); | |
| // fprintf(stderr, "[DEBUG] read character %#hhx ( '%c' )\n", c, isprint(c) ? c : '\0'); | |
| if (c == EOF) { | |
| if (state->unbalanced_lparens > 0) { | |
| // error | |
| fprintf(stderr, "[DEBUG] parse error: unexpected end of file\n"); | |
| event_cancel(state->parse_ev); | |
| } | |
| } else if (c == '(') { | |
| state->unbalanced_lparens++; | |
| } else if (c == ')') { | |
| if (state->unbalanced_lparens == 0) { | |
| // error | |
| fprintf(stderr, "[DEBUG] parse error: unexpected ')'\n"); | |
| event_cancel(state->parse_ev); | |
| } else { | |
| state->unbalanced_lparens--; | |
| } | |
| } | |
| if (c == '(' || c == ')') { | |
| if (state->content_length + 1 >= state->buffer_size) { | |
| size_t buffer_size = state->buffer_size ? (state->buffer_size * 2) : 16; | |
| char *buffer = realloc(state->buffer, buffer_size); | |
| if (!buffer) { | |
| fprintf(stderr, "[DEBUG] parse error: failed to allocate buffer: %s\n", | |
| strerror(errno)); | |
| event_cancel(state->parse_ev); | |
| } else { | |
| state->buffer = buffer; | |
| state->buffer_size = buffer_size; | |
| } | |
| } | |
| state->buffer[state->content_length++] = c; | |
| state->buffer[state->content_length] = '\0'; | |
| } | |
| if (state->unbalanced_lparens == 0) { | |
| // we're done | |
| event_return(state->parse_ev, state->buffer); | |
| parse_balanced_parens_state_free(state, false); | |
| } else if (state->parse_ev->is_cancelled) { | |
| // destroy the state info as we've been cancelled | |
| parse_balanced_parens_state_free(state, true); | |
| } else { | |
| // we need to parse more | |
| // add a new event for the next read task | |
| eventloop_add(state->parse_ev->loop, | |
| event_new_from_fd(fileno(state->file), true, parse_balanced_parens_async_cb, state)); | |
| } | |
| } | |
| static void parse_balanced_parens_async(FILE *file, | |
| eventloop *loop, | |
| async_callback callback, | |
| void *callback_data) | |
| { | |
| parse_balanced_parens_state *state = calloc(1, sizeof *state); | |
| state->file = file; | |
| // this is the event representing the state of the parsing task | |
| state->parse_ev = event_new(callback, callback_data); | |
| eventloop_add(loop, state->parse_ev); | |
| // this is the event representing the state of the read task | |
| event *read_ev = event_new_from_fd(fileno(file), true, parse_balanced_parens_async_cb, state); | |
| eventloop_add(loop, read_ev); | |
| } | |
| static char *parse_balanced_parens_finish(event *ev, int *error) | |
| { | |
| if (ev->io_errno != 0) { | |
| if (error) | |
| *error = ev->io_errno; | |
| free(ev); | |
| return NULL; | |
| } | |
| char *result = ev->result; | |
| free(ev); // free the task | |
| return result; | |
| } | |
| static void on_parse_balanced_parens(event *parse_ev, void *user_data) | |
| { | |
| int errnum = 0; | |
| char *result = parse_balanced_parens_finish(parse_ev, &errnum); | |
| if (errnum) { | |
| fprintf(stderr, "[DEBUG] parse error: %s\n", strerror(errnum)); | |
| } else { | |
| printf("done parsing balanced parentheses: %s\n", result); | |
| } | |
| free(result); | |
| } | |
| typedef struct { | |
| unsigned n; | |
| int *values; | |
| event *ev; | |
| } sort_state; | |
| static int sort_begin(void *user_data) | |
| { | |
| sort_state *state = user_data; | |
| unsigned operations = 0; | |
| for (unsigned i = 0; i < state->n; i++) { | |
| for (unsigned j = i + 1; j < state->n; j++) { | |
| if (state->values[i] > state->values[j]) { | |
| int temp = state->values[i]; | |
| state->values[i] = state->values[j]; | |
| state->values[j] = temp; | |
| } | |
| operations++; | |
| } | |
| } | |
| event_return(state->ev, (void *) (intptr_t) operations); | |
| free(state); | |
| return 0; | |
| } | |
| static unsigned sort_finish(event *ev, int *error) | |
| { | |
| thrd_join(ev->thread, NULL); | |
| if (ev->io_errno != 0) { | |
| if (error) | |
| *error = ev->io_errno; | |
| free(ev); | |
| return 0; | |
| } | |
| unsigned operations = (unsigned) (intptr_t) ev->result; | |
| free(ev); | |
| return operations; | |
| } | |
| // perform expensive background computation | |
| static void sort_async(unsigned n, | |
| int *values, | |
| eventloop *loop, | |
| async_callback callback, | |
| void *callback_data) | |
| { | |
| event *ev = event_new_for_background(callback, callback_data); | |
| eventloop_add(loop, ev); | |
| sort_state *state = calloc(1, sizeof *state); | |
| state->n = n; | |
| state->values = values; | |
| state->ev = ev; | |
| if (thrd_create(&ev->thread, sort_begin, state) != thrd_success) { | |
| perror("thrd_create"); | |
| abort(); | |
| } | |
| } | |
| static void on_sort(event *sort_ev, void *user_data) | |
| { | |
| int errnum = 0; | |
| unsigned operations = sort_finish(sort_ev, &errnum); | |
| const sort_state *state = user_data; | |
| bool is_sorted = true; | |
| for (unsigned i = 1; i < state->n; i++) { | |
| if (state->values[i - 1] > state->values[i]) { | |
| is_sorted = false; | |
| break; | |
| } | |
| } | |
| if (errnum) { | |
| fprintf(stderr, "[DEBUG] sort error: %s\n", strerror(errnum)); | |
| } else if (!is_sorted) { | |
| fprintf(stderr, "[DEBUG] sort algorithm returned incorrect result\n"); | |
| } else { | |
| printf("done sorting in %u operations\n", operations); | |
| } | |
| free(state->values); | |
| } | |
| int main(int argc, char *argv[]) | |
| { | |
| if (fcntl(STDIN_FILENO, F_SETFL, fcntl(STDIN_FILENO, F_GETFL) | O_NONBLOCK) == -1) { | |
| perror("fcntl() on stdin"); | |
| return 1; | |
| } | |
| if (setvbuf(stdin, NULL, _IONBF, 0) != 0) { | |
| perror("setvbuf"); | |
| return 1; | |
| } | |
| eventloop *loop = eventloop_new(); | |
| // add some tasks | |
| // 1. parsing | |
| parse_balanced_parens_async(stdin, loop, on_parse_balanced_parens, NULL); | |
| // 2. sorting | |
| unsigned num_values = 1024 * 32; | |
| int *values = malloc(num_values * sizeof(*values)); | |
| if (!values) { | |
| perror("malloc"); | |
| abort(); | |
| } | |
| for (unsigned i = 0; i < num_values; i++) | |
| values[i] = rand() % num_values; | |
| sort_async(num_values, values, loop, on_sort, &(sort_state) { .n = num_values, .values = values }); | |
| while (eventloop_process(loop)) | |
| /* process events until there are no more */; | |
| eventloop_destroy(loop); | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment