Skip to content

Instantly share code, notes, and snippets.

@Knio
Created January 25, 2011 01:10
Show Gist options
  • Save Knio/794336 to your computer and use it in GitHub Desktop.
Save Knio/794336 to your computer and use it in GitHub Desktop.
/* devnull/ioset.h - Asynchronous file descriptor event system.
* Copyright (c) 2011, Keith Plant. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of its contributors may
* be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#if !defined(IOSET_H)
#define IOSET_H
#include <sys/socket.h>
#include <sys/types.h>
#include <stdint.h>
#include "list.h"
struct epoll_event;
enum shutdown_mode {
SHUTDOWN_FREE = (0 << 0),
SHUTDOWN_FLUSH = (1 << 0),
SHUTDOWN_CLOSE = (1 << 1),
SHUTDOWN_FCLOSE = (SHUTDOWN_FLUSH | SHUTDOWN_CLOSE)
};
enum iofd_state {
IO_CLOSED,
IO_LISTENDG,
IO_CONNECTDG,
IO_LISTENING,
IO_CONNECTING,
IO_CONNECTED
};
union ioset_data {
void *ptr;
uint32_t u32;
uint64_t u64;
};
struct ioq {
void *buf;
size_t size;
size_t put, get;
};
struct io_fd {
int fildes;
struct ioq send;
struct ioq recv;
struct ioset *ioset;
struct sockaddr *dest;
socklen_t dest_addrlen;
enum iofd_state state;
union ioset_data data;
struct list_head fdlist;
unsigned int wants_reads : 1;
void (*closecb)(struct io_fd *);
void (*connectcb)(struct io_fd *, int);
void (*acceptcb)(struct io_fd *, struct io_fd *);
void (*readablecb)(struct io_fd *, const struct sockaddr *, socklen_t);
};
struct ioset {
const char *name;
int epoll_fd;
int maxevents;
sigset_t sigmask;
size_t fd_count;
struct io_fd *active_fd;
struct list_head iofd_list;
struct epoll_event *events;
};
extern struct ioset *ioset_create(void);
extern int ioset_destroy(struct ioset *ioset);
extern void ioset_close_all(struct ioset *ioset, enum shutdown_mode mode);
extern size_t ioset_fd_count(struct ioset *ioset);
extern const char *ioset_set_name(struct ioset *ioset, const char *new_name);
extern int ioset_allow_signal(struct ioset *ioset, int signo);
extern int ioset_block_signal(struct ioset *ioset, int signo);
extern int ioset_fileno(const struct io_fd *fd);
extern struct io_fd *ioset_add(struct ioset *ioset, int fildes);
extern int ioset_make_local(int socktype, const char *address,
unsigned int port, struct sockaddr *res,
socklen_t *res_addrlen);
extern struct io_fd *ioset_tcp_connect(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
const char *peer, unsigned int port,
void (*connectcb)(struct io_fd *, int));
extern struct io_fd *ioset_tcp_listen(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
void (*acceptcb)(struct io_fd *,
struct io_fd *));
extern struct io_fd *ioset_udp_connect(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
const char *peer, unsigned int port,
void (*connectcb)(struct io_fd *, int));
extern struct io_fd *ioset_udp_listen(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
void (*readablecb)(struct io_fd *,
const struct sockaddr*,
socklen_t));
extern int ioset_close(struct io_fd *fd);
extern int ioset_shutdown(struct io_fd *fd, enum shutdown_mode mode);
extern size_t ioset_read(struct io_fd *fd, void *dest, size_t max);
extern size_t ioset_readline(struct io_fd *fd, void *dest, size_t max);
extern size_t ioset_write(struct io_fd *fd, const void *buf, size_t nbyte);
extern ssize_t ioset_writeto(struct io_fd *fd, const void *buf, size_t nbyte,
struct sockaddr *dest, socklen_t dest_addrlen);
extern int ioset_dispatch(struct ioset *ioset, struct timeval *tv);
#define ioset_foreach(ioset, iter) \
list_foreach_entry(&(ioset)->iofd_list, iter, fdlist)
#define ioset_foreach_safe(ioset, iter, next) \
list_foreach_entry_safe(&(ioset)->iofd_list, iter, fdlist, next)
#define ioset_foreach_prev(ioset, iter) \
list_foreach_entry_prev(&(ioset)->iofd_list, iter, fdlist)
#define ioset_foreach_prev_safe(ioset, iter, prev) \
list_foreach_entry_prev_safe(&(ioset)->iofd_list, iter, fdlist, prev)
#endif /* !defined(IOSET_H) */
/* devnull/ioset.c - Asynchronous file descriptor event system.
* Copyright (c) 2011, Keith Plant. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of its contributors may
* be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/epoll.h>
#include <netdb.h>
#include "common.h"
#include "ioset.h"
#include "list.h"
/*
* On Linux kernels at least up to 2.6.24.4, epoll can't handle
* timeout values bigger than (LONG_MAX - 999ULL)/HZ. HZ in the
* wild can be as big as 1000, and LONG_MAX can be as small as
* (1 << 31) - 1, so the largest number of milliseconds we can
* support here is 2147482. Let's round that down to 35 minutes.
*/
#define MAX_TIMEOUT (35*60*1000)
#define MIN_MAXEVENTS 8
#define MAX_MAXEVENTS 4096
static void ioset_readable(struct io_fd *fd);
static void ioset_writeable(struct io_fd *fd);
static void ioq_init(struct ioq *ioq, size_t size)
{
ioq->buf = malloc(size);
ioq->size = size;
ioq->put = ioq->get = 0;
}
static size_t ioq_put_avail(const struct ioq *ioq)
{
if (ioq->put < ioq->get) {
return ioq->get - ioq->put - 1;
} else if (!ioq->get) {
return ioq->size - ioq->put - 1;
} else {
return ioq->size - ioq->put;
}
}
static size_t ioq_get_avail(const struct ioq *ioq)
{
return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
}
static size_t ioq_used(const struct ioq *ioq)
{
return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
}
static size_t ioq_find_line_length(const struct ioq *ioq)
{
size_t pos, max;
size_t line_len = 1;
unsigned char *buffer = ioq->buf;
max = (ioq->put < ioq->get) ? ioq->size : ioq->put;
for (pos = ioq->get; pos < max; ++pos, ++line_len) {
if (buffer[pos] == '\n')
return line_len;
}
max = (ioq->put < ioq->get) ? ioq->put : 0;
for (pos = 0; pos < max; ++pos, ++line_len) {
if (buffer[pos] == '\n')
return line_len;
}
return 0;
}
static size_t ioq_grow(struct ioq *ioq)
{
size_t new_size = ioq->size << 1;
size_t avail = ioq_get_avail(ioq);
void *new_buf = malloc(new_size);
memcpy(new_buf, ioq->buf + ioq->get, avail);
if (ioq->put < ioq->get)
memcpy(new_buf + avail, ioq->buf, ioq->put);
free(ioq->buf);
ioq->put = ioq_used(ioq);
ioq->get = 0;
ioq->size = new_size;
ioq->buf = new_buf;
return new_size - ioq->put - 1;
}
static int epoll_init(void)
{
int epfd;
int flags;
epfd = epoll_create(1024);
if (epfd < 0)
return -1;
flags = fcntl(epfd, F_GETFD);
fcntl(epfd, F_SETFD, flags | FD_CLOEXEC);
return epfd;
}
static int epoll_shutdown(struct ioset *ioset)
{
return close(ioset->epoll_fd);
}
static uint32_t epoll_events(struct io_fd *fd)
{
uint32_t events;
unsigned int writeable;
events = EPOLLHUP | EPOLLERR;
events |= (fd->wants_reads ? EPOLLIN : 0);
writeable = (fd->send.put != fd->send.get) ||
(fd->state == IO_CONNECTING);
return events | (writeable ? EPOLLOUT : 0);
}
static int epoll_add(struct io_fd *fd)
{
int res;
int epoll_fd;
struct epoll_event epev;
epev.data.ptr = fd;
epev.events = epoll_events(fd);
epoll_fd = fd->ioset->epoll_fd;
res = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd->fildes, &epev);
return res;
}
static int epoll_update(struct io_fd *fd)
{
int res;
int epoll_fd;
struct epoll_event epev;
epev.data.ptr = fd;
epev.events = epoll_events(fd);
epoll_fd = fd->ioset->epoll_fd;
res = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd->fildes, &epev);
return res;
}
static int epoll_remove(struct io_fd *fd)
{
int res;
int epoll_fd;
struct epoll_event epev;
epoll_fd = fd->ioset->epoll_fd;
res = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd->fildes, &epev);
return res;
}
static void ioset_list_add(struct ioset *ioset, struct io_fd *fd)
{
ioset->fd_count++;
list_insert(&ioset->iofd_list, &fd->fdlist);
}
static void ioset_list_del(struct ioset *ioset, struct io_fd *fd)
{
ioset->fd_count--;
list_remove(&fd->fdlist);
}
struct ioset *ioset_create(void)
{
int epoll_fd;
struct ioset *ioset;
epoll_fd = epoll_init();
if (epoll_fd < 0) {
fprintf(stderr, "Unable to create new epoll backing: %s\n",
strerror(errno));
return NULL;
}
ioset = calloc(1, sizeof(*ioset));
ioset->epoll_fd = epoll_fd;
ioset->maxevents = MIN_MAXEVENTS;
ioset->events = malloc(ioset->maxevents * sizeof(*ioset->events));
list_head_init(&ioset->iofd_list);
sigemptyset(&ioset->sigmask);
return ioset;
}
int ioset_destroy(struct ioset *ioset)
{
int res;
res = epoll_shutdown(ioset);
if (res < 0) {
fprintf(stderr, "Unable to shutdown epoll backing for %s: %s\n",
ioset->name, strerror(errno));
return -1;
}
free((void *)ioset->name);
free(ioset->events);
free(ioset);
return 0;
}
void ioset_close_all(struct ioset *ioset, enum shutdown_mode mode)
{
struct io_fd *fd, *nextfd;
ioset_foreach_safe(ioset, fd, nextfd) {
ioset_shutdown(fd, mode);
}
}
size_t ioset_fd_count(struct ioset *ioset)
{
return ioset->fd_count;
}
const char *ioset_set_name(struct ioset *ioset, const char *new_name)
{
if (new_name) {
free((void *)ioset->name);
ioset->name = strdup(new_name);
}
return ioset->name;
}
int ioset_allow_signal(struct ioset *ioset, int signo)
{
return sigdelset(&ioset->sigmask, signo);
}
int ioset_block_signal(struct ioset *ioset, int signo)
{
return sigaddset(&ioset->sigmask, signo);
}
int ioset_fileno(const struct io_fd *fd)
{
return fd->fildes;
}
struct io_fd *ioset_add(struct ioset *ioset, int fildes)
{
int res;
struct io_fd *io_fd;
io_fd = calloc(1, sizeof(*io_fd));
io_fd->fildes = fildes;
io_fd->ioset = ioset;
io_fd->wants_reads = 1;
res = epoll_add(io_fd);
if (res < 0) {
fprintf(stderr, "Unable to add fd %d to ioset %s: %s\n",
fildes, ioset->name, strerror(errno));
free(io_fd);
return NULL;
}
res = fcntl(fildes, F_GETFL);
fcntl(fildes, F_SETFL, res | O_NONBLOCK);
res = fcntl(fildes, F_GETFD);
fcntl(fildes, F_SETFD, res | FD_CLOEXEC);
ioset_list_add(ioset, io_fd);
ioq_init(&io_fd->send, 1024);
ioq_init(&io_fd->recv, 1024);
return io_fd;
}
int ioset_make_local(int socktype, const char *address, unsigned int port,
struct sockaddr *res, socklen_t *res_addrlen)
{
int gai_error;
char portstr[8];
struct addrinfo hints;
struct addrinfo *ai;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = socktype;
snprintf(portstr, sizeof(portstr), "%u", port);
gai_error = getaddrinfo(address, portstr, &hints, &ai);
if (gai_error)
return gai_error;
if (ai->ai_addrlen < *res_addrlen)
*res_addrlen = ai->ai_addrlen;
memcpy(res, ai->ai_addr, *res_addrlen);
freeaddrinfo(ai);
return 0;
}
static int ioset_getaddrinfo(int family, int socktype, const char *node,
unsigned int service, struct addrinfo **res)
{
int gai_error;
char srvc[8];
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = family;
hints.ai_socktype = socktype;
snprintf(srvc, sizeof(srvc), "%u", service);
gai_error = getaddrinfo(node, srvc, &hints, res);
return gai_error;
}
static int ioset_bind(int fd, const struct sockaddr *addr, socklen_t addrlen)
{
int res;
unsigned int option = 1;
if (!addr)
return -1;
res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
if (res < 0) {
fprintf(stderr, "Unable to mark local address for fd %d "
"as reuseable: %s\n", fd, strerror(errno));
}
res = bind(fd, addr, addrlen);
if (res < 0) {
fprintf(stderr, "Unable to bind fd %d to local address: %s\n",
fd, strerror(errno));
}
return res;
}
struct io_fd *ioset_tcp_connect(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
const char *peer, unsigned int port,
void (*connectcb)(struct io_fd *, int))
{
int fd;
int res;
int family;
struct io_fd *io_fd;
struct io_fd *old_active;
struct addrinfo *ai;
family = local ? local->sa_family : PF_INET;
fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
fprintf(stderr, "Unable to create TCP socket for %s:%u... %s\n",
peer, port, strerror(errno));
return NULL;
}
family = local ? local->sa_family : AF_INET;
res = ioset_getaddrinfo(family, SOCK_STREAM, peer, port, &ai);
if (res) {
fprintf(stderr, "Unable to resolve peer %s:%u... %s\n",
peer, port, gai_strerror(res));
return NULL;
}
res = ioset_bind(fd, local, local_addrlen);
if (res < 0) {
fprintf(stderr, "Failed to set the local address for peer "
"%s:%u... Letting the operating system decide "
"the local address.\n", peer, port);
}
io_fd = ioset_add(ioset, fd);
res = connect(fd, ai->ai_addr, ai->ai_addrlen);
freeaddrinfo(ai);
if (!io_fd) {
close(fd);
return NULL;
}
io_fd->state = IO_CONNECTING;
io_fd->connectcb = connectcb;
if (res < 0) switch (errno) {
case EINPROGRESS:
epoll_update(io_fd);
return io_fd;
default:
fprintf(stderr, "Unable to connect to peer %s:%d... %s\n",
peer, port, strerror(errno));
ioset_shutdown(io_fd, SHUTDOWN_CLOSE);
return NULL;
}
io_fd->state = IO_CONNECTED;
old_active = ioset->active_fd;
if (connectcb)
connectcb(io_fd, 0);
if (ioset->active_fd == io_fd)
epoll_update(io_fd);
ioset->active_fd = old_active;
return io_fd;
}
struct io_fd *ioset_tcp_listen(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
void (*acceptcb)(struct io_fd *, struct io_fd *))
{
int fd;
int res;
struct io_fd *io_fd;
res = local ? local->sa_family : PF_INET;
fd = socket(res, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
fprintf(stderr, "Unable to create new TCP listening fd: %s\n",
strerror(errno));
return NULL;
}
res = ioset_bind(fd, local, local_addrlen);
if (res < 0) {
fprintf(stderr, "Unable to bind TCP listening socket %d to "
"local address: %s\n", fd, strerror(errno));
close(fd);
return NULL;
}
res = listen(fd, 8);
if (res < 0) {
fprintf(stderr, "Unable to listen for new TCP connections "
"on socket fd %d: %s\n", fd, strerror(errno));
close(fd);
return NULL;
}
io_fd = ioset_add(ioset, fd);
if (!io_fd) {
close(fd);
return NULL;
}
io_fd->state = IO_LISTENING;
io_fd->acceptcb = acceptcb;
epoll_update(io_fd);
return io_fd;
}
struct io_fd *ioset_udp_connect(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
const char *peer, unsigned int port,
void (*connectcb)(struct io_fd *, int))
{
int fd;
int res;
int family;
struct io_fd *io_fd;
struct io_fd *old_active;
struct addrinfo *ai;
family = local ? local->sa_family : PF_INET;
fd = socket(family, SOCK_DGRAM, IPPROTO_UDP);
if (fd < 0) {
fprintf(stderr, "Unable to create UDP socket for %s:%u... %s\n",
peer, port, strerror(errno));
return NULL;
}
family = local ? local->sa_family : AF_INET;
res = ioset_getaddrinfo(family, SOCK_DGRAM, peer, port, &ai);
if (res) {
fprintf(stderr, "Unable to resolve peer %s:%u... %s\n",
peer, port, strerror(errno));
return NULL;
}
res = ioset_bind(fd, local, local_addrlen);
if (res < 0) {
fprintf(stderr, "Failed to set the local address for peer "
"%s:%u... Letting the operating system decide "
"the local address.\n", peer, port);
}
io_fd = ioset_add(ioset, fd);
if (!io_fd) {
close(fd);
freeaddrinfo(ai);
return NULL;
}
io_fd->state = IO_CONNECTDG;
io_fd->connectcb = connectcb;
io_fd->dest = malloc(ai->ai_addrlen);
io_fd->dest_addrlen = ai->ai_addrlen;
memcpy(io_fd->dest, ai->ai_addr, ai->ai_addrlen);
old_active = ioset->active_fd;
ioset->active_fd = io_fd;
if (connectcb)
connectcb(io_fd, 0);
if (ioset->active_fd == io_fd)
epoll_update(io_fd);
ioset->active_fd = old_active;
return io_fd;
}
struct io_fd *ioset_udp_listen(struct ioset *ioset,
const struct sockaddr *local,
socklen_t local_addrlen,
void (*readablecb)(struct io_fd *,
const struct sockaddr *,
socklen_t))
{
int fd;
int res;
struct io_fd *io_fd;
res = local ? local->sa_family : PF_INET;
fd = socket(res, SOCK_DGRAM, IPPROTO_UDP);
if (fd < 0) {
fprintf(stderr, "Unable to create listening UDP socket: %s\n",
strerror(errno));
return NULL;
}
res = ioset_bind(fd, local, local_addrlen);
if (res < 0) {
fprintf(stderr, "Unable to bind listening UDP socket %d "
"to local address: %s\n", fd, strerror(errno));
close(fd);
return NULL;
}
io_fd = ioset_add(ioset, fd);
if (!io_fd) {
close(fd);
return NULL;
}
io_fd->state = IO_LISTENDG;
io_fd->readablecb = readablecb;
epoll_update(io_fd);
return io_fd;
}
int ioset_close(struct io_fd *fd)
{
return ioset_shutdown(fd, SHUTDOWN_FCLOSE);
}
int ioset_shutdown(struct io_fd *fd, enum shutdown_mode mode)
{
int fildes = fd->fildes;
struct ioset *ioset = fd->ioset;
fd->state = IO_CLOSED;
if (ioset->active_fd == fd)
ioset->active_fd = NULL;
if (fd->closecb)
fd->closecb(fd);
if ((fd->send.put != fd->send.get) && (mode & SHUTDOWN_FLUSH)) {
int flags;
flags = fcntl(fildes, F_GETFL);
fcntl(fildes, F_SETFL, flags & ~O_NONBLOCK);
ioset_writeable(fd);
if (fd->send.put != fd->send.get)
ioset_writeable(fd);
}
epoll_remove(fd);
ioset_list_del(ioset, fd);
free(fd->send.buf);
free(fd->recv.buf);
free(fd->dest);
free(fd);
return (mode & SHUTDOWN_CLOSE) ? close(fildes) : fildes;
}
size_t ioset_read(struct io_fd *fd, void *dest, size_t max)
{
size_t pos = 0;
size_t get_avail;
size_t byte_avail;
struct ioq *recv = &fd->recv;
get_avail = ioq_get_avail(recv);
byte_avail = ioq_used(recv);
if (!get_avail)
return 0;
if (max > byte_avail)
max = byte_avail;
if (max > get_avail) {
memcpy(dest, recv->buf + recv->get, get_avail);
recv->get = 0;
pos = get_avail;
}
memcpy(dest + pos, recv->buf + recv->get, max - pos);
recv->get += max - pos;
if (recv->get == recv->size)
recv->get = 0;
return max;
}
size_t ioset_readline(struct io_fd *fd, void *dest, size_t max)
{
size_t line_len;
line_len = ioq_find_line_length(&fd->recv);
if (!line_len)
return 0;
return ioset_read(fd, dest, (max > line_len) ? line_len : max);
}
size_t ioset_write(struct io_fd *fd, const void *buf, size_t nbyte)
{
size_t avail;
ssize_t res = 0;
struct ioq *send = &fd->send;
avail = ioq_used(send);
while (avail + nbyte >= send->size)
ioq_grow(send);
avail = ioq_put_avail(send);
if (nbyte > avail) {
memcpy(send->buf + send->put, buf, avail);
send->put = 0;
res = avail;
nbyte -= avail;
buf += avail;
}
memcpy(send->buf + send->put, buf, nbyte);
send->put += nbyte;
if (send->put == send->size)
send->put = 0;
epoll_update(fd);
return res + nbyte;
}
ssize_t ioset_writeto(struct io_fd *fd, const void *buf, size_t nbyte,
struct sockaddr *dest, socklen_t dest_addrlen)
{
ssize_t res;
res = sendto(fd->fildes, buf, nbyte, 0, dest, dest_addrlen);
if ((res < 0) && (errno != EAGAIN)) {
fprintf(stderr, "ioset_sendto() error on fd %d: %s\n",
fd->fildes, strerror(errno));
}
return res;
}
static ssize_t ioset_recvfrom(struct io_fd *fd, void *dest, size_t max,
struct sockaddr *addr, socklen_t *addrlen)
{
switch (fd->state) {
case IO_LISTENDG:
case IO_CONNECTDG:
return recvfrom(fd->fildes, dest, max, 0, addr, addrlen);
case IO_CONNECTED:
memset(addr, 0, *addrlen);
return read(fd->fildes, dest, max);
default:
errno = EINVAL;
return -1;
}
}
static ssize_t ioset_sendto(struct io_fd *fd, const void *buf, size_t nbyte)
{
switch (fd->state) {
case IO_LISTENDG:
case IO_CONNECTDG:
return sendto(fd->fildes, buf, nbyte, 0, fd->dest,
fd->dest_addrlen);
case IO_CONNECTED:
return write(fd->fildes, buf, nbyte);
default:
errno = EINVAL;
return -1;
}
}
static void ioset_accept(struct io_fd *listener)
{
int fd;
struct io_fd *new_fd;
struct io_fd *old_active;
struct sockaddr addr;
socklen_t addrlen = sizeof(addr);
struct ioset *ioset = listener->ioset;
fd = accept(listener->fildes, &addr, &addrlen);
if (fd < 0) {
fprintf(stderr, "Unable to accept new TCP connection on "
"listening fd %d: %s\n",
listener->fildes, strerror(errno));
return;
}
new_fd = ioset_add(ioset, fd);
if (!new_fd) {
close(fd);
return;
}
new_fd->state = IO_CONNECTED;
new_fd->dest = malloc(addrlen);
new_fd->dest_addrlen = addrlen;
memcpy(new_fd->dest, &addr, addrlen);
old_active = ioset->active_fd;
ioset->active_fd = new_fd;
listener->acceptcb(listener, new_fd);
if (ioset->active_fd == new_fd) {
if (new_fd->send.put != new_fd->send.get) {
ioset_writeable(new_fd);
} else {
epoll_update(new_fd);
}
}
ioset->active_fd = old_active;
}
static void ioset_connecting(struct io_fd *fd, uint32_t readable,
uint32_t writeable)
{
int res;
if (readable) {
int option;
socklen_t option_len;
option_len = sizeof(option);
res = getsockopt(fd->fildes, SOL_SOCKET, SO_ERROR, &option,
&option_len);
res = (res < 0) ? errno : option;
fd->state = IO_CLOSED;
} else if (writeable) {
res = 0;
fd->state = IO_CONNECTED;
}
if (fd->connectcb)
fd->connectcb(fd, res);
}
static void ioset_readable(struct io_fd *fd)
{
size_t avail;
ssize_t res;
struct ioq *recv = &fd->recv;
struct ioset *ioset = fd->ioset;
struct sockaddr source;
socklen_t source_addrlen = sizeof(source);
avail = ioq_put_avail(recv);
if (!avail)
avail = ioq_grow(recv);
res = ioset_recvfrom(fd, recv->buf + recv->put, avail, &source,
&source_addrlen);
if (res < 0) {
if (errno == EAGAIN)
return;
fprintf(stderr, "Unexpected read() error on fd %d: %s\n",
fd->fildes, strerror(errno));
fd->state = IO_CLOSED;
fd->readablecb(fd, &source, source_addrlen);
if (ioset->active_fd == fd)
epoll_update(fd);
} else if (!res) {
fd->state = IO_CLOSED;
fd->readablecb(fd, &source, source_addrlen);
if (ioset->active_fd == fd)
epoll_update(fd);
} else {
recv->put += res;
if (recv->put == recv->size)
recv->put = 0;
do {
fd->readablecb(fd, &source, source_addrlen);
if (ioset->active_fd != fd)
break;
epoll_update(fd);
} while (fd->recv.put != fd->recv.get);
}
}
static void ioset_writeable(struct io_fd *fd)
{
size_t avail;
ssize_t res;
struct ioq *send = &fd->send;
avail = ioq_get_avail(send);
res = ioset_sendto(fd, send->buf + send->get, avail);
if (res < 0) {
if (errno == EAGAIN)
return;
fprintf(stderr, "Unexpected write() error on fd %d: %s\n",
fd->fildes, strerror(errno));
} else {
send->get += res;
if (send->get == send->size)
send->get = 0;
epoll_update(fd);
}
}
static void ioset_event(struct io_fd *fd, uint32_t events)
{
struct ioset *ioset = fd->ioset;
uint32_t writeable = events & EPOLLOUT;
uint32_t readable = events & (EPOLLIN | EPOLLHUP | EPOLLERR);
ioset->active_fd = fd;
switch (fd->state) {
case IO_CLOSED:
ioset_shutdown(fd, SHUTDOWN_CLOSE);
break;
case IO_LISTENING:
if (readable)
ioset_accept(fd);
break;
case IO_CONNECTING:
ioset_connecting(fd, readable, writeable);
if (ioset->active_fd != fd)
break;
epoll_update(fd);
case IO_LISTENDG:
case IO_CONNECTDG:
case IO_CONNECTED:
if (readable && ioset->active_fd)
ioset_readable(fd);
if (writeable && ioset->active_fd)
ioset_writeable(fd);
break;
}
}
int ioset_dispatch(struct ioset *ioset, struct timeval *tv)
{
int ii;
int res;
int timeout = -1;
int epoll_fd = ioset->epoll_fd;
int maxevents = ioset->maxevents;
struct epoll_event *events = ioset->events;
const sigset_t *sigmask = &ioset->sigmask;
if (tv) {
timeout = tv->tv_usec / 1000;
timeout += tv->tv_sec * 1000;
if (timeout > MAX_TIMEOUT)
timeout = MAX_TIMEOUT;
}
res = epoll_pwait(epoll_fd, events, maxevents, timeout, sigmask);
if (res < 0) {
fprintf(stderr, "epoll_pwait() error in ioset %s: %s\n",
ioset->name, strerror(errno));
return (errno == EINTR) ? 0 : -1;
}
for (ii = 0; ii < res; ++ii)
ioset_event(events[ii].data.ptr, events[ii].events);
if ((res == maxevents) && (maxevents < MAX_MAXEVENTS)) {
maxevents <<= 1;
events = realloc(ioset->events,
maxevents * sizeof(*ioset->events));
if (events) {
ioset->events = events;
ioset->maxevents = maxevents;
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment