Created
January 25, 2011 01:10
-
-
Save Knio/794336 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* devnull/ioset.h - Asynchronous file descriptor event system. | |
* Copyright (c) 2011, Keith Plant. All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* | |
* 1. Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* | |
* 2. Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* | |
* 3. Neither the name of the author nor the names of its contributors may | |
* be used to endorse or promote products derived from this software | |
* without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE | |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
* POSSIBILITY OF SUCH DAMAGE. | |
*/ | |
#if !defined(IOSET_H) | |
#define IOSET_H | |
#include <sys/socket.h> | |
#include <sys/types.h> | |
#include <stdint.h> | |
#include "list.h" | |
struct epoll_event; | |
enum shutdown_mode { | |
SHUTDOWN_FREE = (0 << 0), | |
SHUTDOWN_FLUSH = (1 << 0), | |
SHUTDOWN_CLOSE = (1 << 1), | |
SHUTDOWN_FCLOSE = (SHUTDOWN_FLUSH | SHUTDOWN_CLOSE) | |
}; | |
enum iofd_state { | |
IO_CLOSED, | |
IO_LISTENDG, | |
IO_CONNECTDG, | |
IO_LISTENING, | |
IO_CONNECTING, | |
IO_CONNECTED | |
}; | |
union ioset_data { | |
void *ptr; | |
uint32_t u32; | |
uint64_t u64; | |
}; | |
struct ioq { | |
void *buf; | |
size_t size; | |
size_t put, get; | |
}; | |
struct io_fd { | |
int fildes; | |
struct ioq send; | |
struct ioq recv; | |
struct ioset *ioset; | |
struct sockaddr *dest; | |
socklen_t dest_addrlen; | |
enum iofd_state state; | |
union ioset_data data; | |
struct list_head fdlist; | |
unsigned int wants_reads : 1; | |
void (*closecb)(struct io_fd *); | |
void (*connectcb)(struct io_fd *, int); | |
void (*acceptcb)(struct io_fd *, struct io_fd *); | |
void (*readablecb)(struct io_fd *, const struct sockaddr *, socklen_t); | |
}; | |
struct ioset { | |
const char *name; | |
int epoll_fd; | |
int maxevents; | |
sigset_t sigmask; | |
size_t fd_count; | |
struct io_fd *active_fd; | |
struct list_head iofd_list; | |
struct epoll_event *events; | |
}; | |
extern struct ioset *ioset_create(void); | |
extern int ioset_destroy(struct ioset *ioset); | |
extern void ioset_close_all(struct ioset *ioset, enum shutdown_mode mode); | |
extern size_t ioset_fd_count(struct ioset *ioset); | |
extern const char *ioset_set_name(struct ioset *ioset, const char *new_name); | |
extern int ioset_allow_signal(struct ioset *ioset, int signo); | |
extern int ioset_block_signal(struct ioset *ioset, int signo); | |
extern int ioset_fileno(const struct io_fd *fd); | |
extern struct io_fd *ioset_add(struct ioset *ioset, int fildes); | |
extern int ioset_make_local(int socktype, const char *address, | |
unsigned int port, struct sockaddr *res, | |
socklen_t *res_addrlen); | |
extern struct io_fd *ioset_tcp_connect(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
const char *peer, unsigned int port, | |
void (*connectcb)(struct io_fd *, int)); | |
extern struct io_fd *ioset_tcp_listen(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
void (*acceptcb)(struct io_fd *, | |
struct io_fd *)); | |
extern struct io_fd *ioset_udp_connect(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
const char *peer, unsigned int port, | |
void (*connectcb)(struct io_fd *, int)); | |
extern struct io_fd *ioset_udp_listen(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
void (*readablecb)(struct io_fd *, | |
const struct sockaddr*, | |
socklen_t)); | |
extern int ioset_close(struct io_fd *fd); | |
extern int ioset_shutdown(struct io_fd *fd, enum shutdown_mode mode); | |
extern size_t ioset_read(struct io_fd *fd, void *dest, size_t max); | |
extern size_t ioset_readline(struct io_fd *fd, void *dest, size_t max); | |
extern size_t ioset_write(struct io_fd *fd, const void *buf, size_t nbyte); | |
extern ssize_t ioset_writeto(struct io_fd *fd, const void *buf, size_t nbyte, | |
struct sockaddr *dest, socklen_t dest_addrlen); | |
extern int ioset_dispatch(struct ioset *ioset, struct timeval *tv); | |
#define ioset_foreach(ioset, iter) \ | |
list_foreach_entry(&(ioset)->iofd_list, iter, fdlist) | |
#define ioset_foreach_safe(ioset, iter, next) \ | |
list_foreach_entry_safe(&(ioset)->iofd_list, iter, fdlist, next) | |
#define ioset_foreach_prev(ioset, iter) \ | |
list_foreach_entry_prev(&(ioset)->iofd_list, iter, fdlist) | |
#define ioset_foreach_prev_safe(ioset, iter, prev) \ | |
list_foreach_entry_prev_safe(&(ioset)->iofd_list, iter, fdlist, prev) | |
#endif /* !defined(IOSET_H) */ | |
/* devnull/ioset.c - Asynchronous file descriptor event system. | |
* Copyright (c) 2011, Keith Plant. All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* | |
* 1. Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* | |
* 2. Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* | |
* 3. Neither the name of the author nor the names of its contributors may | |
* be used to endorse or promote products derived from this software | |
* without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE | |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
* POSSIBILITY OF SUCH DAMAGE. | |
*/ | |
#include <sys/epoll.h> | |
#include <netdb.h> | |
#include "common.h" | |
#include "ioset.h" | |
#include "list.h" | |
/* | |
* On Linux kernels at least up to 2.6.24.4, epoll can't handle | |
* timeout values bigger than (LONG_MAX - 999ULL)/HZ. HZ in the | |
* wild can be as big as 1000, and LONG_MAX can be as small as | |
* (1 << 31) - 1, so the largest number of milliseconds we can | |
* support here is 2147482. Let's round that down to 35 minutes. | |
*/ | |
#define MAX_TIMEOUT (35*60*1000) | |
#define MIN_MAXEVENTS 8 | |
#define MAX_MAXEVENTS 4096 | |
static void ioset_readable(struct io_fd *fd); | |
static void ioset_writeable(struct io_fd *fd); | |
static void ioq_init(struct ioq *ioq, size_t size) | |
{ | |
ioq->buf = malloc(size); | |
ioq->size = size; | |
ioq->put = ioq->get = 0; | |
} | |
static size_t ioq_put_avail(const struct ioq *ioq) | |
{ | |
if (ioq->put < ioq->get) { | |
return ioq->get - ioq->put - 1; | |
} else if (!ioq->get) { | |
return ioq->size - ioq->put - 1; | |
} else { | |
return ioq->size - ioq->put; | |
} | |
} | |
static size_t ioq_get_avail(const struct ioq *ioq) | |
{ | |
return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get; | |
} | |
static size_t ioq_used(const struct ioq *ioq) | |
{ | |
return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get; | |
} | |
static size_t ioq_find_line_length(const struct ioq *ioq) | |
{ | |
size_t pos, max; | |
size_t line_len = 1; | |
unsigned char *buffer = ioq->buf; | |
max = (ioq->put < ioq->get) ? ioq->size : ioq->put; | |
for (pos = ioq->get; pos < max; ++pos, ++line_len) { | |
if (buffer[pos] == '\n') | |
return line_len; | |
} | |
max = (ioq->put < ioq->get) ? ioq->put : 0; | |
for (pos = 0; pos < max; ++pos, ++line_len) { | |
if (buffer[pos] == '\n') | |
return line_len; | |
} | |
return 0; | |
} | |
static size_t ioq_grow(struct ioq *ioq) | |
{ | |
size_t new_size = ioq->size << 1; | |
size_t avail = ioq_get_avail(ioq); | |
void *new_buf = malloc(new_size); | |
memcpy(new_buf, ioq->buf + ioq->get, avail); | |
if (ioq->put < ioq->get) | |
memcpy(new_buf + avail, ioq->buf, ioq->put); | |
free(ioq->buf); | |
ioq->put = ioq_used(ioq); | |
ioq->get = 0; | |
ioq->size = new_size; | |
ioq->buf = new_buf; | |
return new_size - ioq->put - 1; | |
} | |
static int epoll_init(void) | |
{ | |
int epfd; | |
int flags; | |
epfd = epoll_create(1024); | |
if (epfd < 0) | |
return -1; | |
flags = fcntl(epfd, F_GETFD); | |
fcntl(epfd, F_SETFD, flags | FD_CLOEXEC); | |
return epfd; | |
} | |
static int epoll_shutdown(struct ioset *ioset) | |
{ | |
return close(ioset->epoll_fd); | |
} | |
static uint32_t epoll_events(struct io_fd *fd) | |
{ | |
uint32_t events; | |
unsigned int writeable; | |
events = EPOLLHUP | EPOLLERR; | |
events |= (fd->wants_reads ? EPOLLIN : 0); | |
writeable = (fd->send.put != fd->send.get) || | |
(fd->state == IO_CONNECTING); | |
return events | (writeable ? EPOLLOUT : 0); | |
} | |
static int epoll_add(struct io_fd *fd) | |
{ | |
int res; | |
int epoll_fd; | |
struct epoll_event epev; | |
epev.data.ptr = fd; | |
epev.events = epoll_events(fd); | |
epoll_fd = fd->ioset->epoll_fd; | |
res = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd->fildes, &epev); | |
return res; | |
} | |
static int epoll_update(struct io_fd *fd) | |
{ | |
int res; | |
int epoll_fd; | |
struct epoll_event epev; | |
epev.data.ptr = fd; | |
epev.events = epoll_events(fd); | |
epoll_fd = fd->ioset->epoll_fd; | |
res = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd->fildes, &epev); | |
return res; | |
} | |
static int epoll_remove(struct io_fd *fd) | |
{ | |
int res; | |
int epoll_fd; | |
struct epoll_event epev; | |
epoll_fd = fd->ioset->epoll_fd; | |
res = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd->fildes, &epev); | |
return res; | |
} | |
static void ioset_list_add(struct ioset *ioset, struct io_fd *fd) | |
{ | |
ioset->fd_count++; | |
list_insert(&ioset->iofd_list, &fd->fdlist); | |
} | |
static void ioset_list_del(struct ioset *ioset, struct io_fd *fd) | |
{ | |
ioset->fd_count--; | |
list_remove(&fd->fdlist); | |
} | |
struct ioset *ioset_create(void) | |
{ | |
int epoll_fd; | |
struct ioset *ioset; | |
epoll_fd = epoll_init(); | |
if (epoll_fd < 0) { | |
fprintf(stderr, "Unable to create new epoll backing: %s\n", | |
strerror(errno)); | |
return NULL; | |
} | |
ioset = calloc(1, sizeof(*ioset)); | |
ioset->epoll_fd = epoll_fd; | |
ioset->maxevents = MIN_MAXEVENTS; | |
ioset->events = malloc(ioset->maxevents * sizeof(*ioset->events)); | |
list_head_init(&ioset->iofd_list); | |
sigemptyset(&ioset->sigmask); | |
return ioset; | |
} | |
int ioset_destroy(struct ioset *ioset) | |
{ | |
int res; | |
res = epoll_shutdown(ioset); | |
if (res < 0) { | |
fprintf(stderr, "Unable to shutdown epoll backing for %s: %s\n", | |
ioset->name, strerror(errno)); | |
return -1; | |
} | |
free((void *)ioset->name); | |
free(ioset->events); | |
free(ioset); | |
return 0; | |
} | |
void ioset_close_all(struct ioset *ioset, enum shutdown_mode mode) | |
{ | |
struct io_fd *fd, *nextfd; | |
ioset_foreach_safe(ioset, fd, nextfd) { | |
ioset_shutdown(fd, mode); | |
} | |
} | |
size_t ioset_fd_count(struct ioset *ioset) | |
{ | |
return ioset->fd_count; | |
} | |
const char *ioset_set_name(struct ioset *ioset, const char *new_name) | |
{ | |
if (new_name) { | |
free((void *)ioset->name); | |
ioset->name = strdup(new_name); | |
} | |
return ioset->name; | |
} | |
int ioset_allow_signal(struct ioset *ioset, int signo) | |
{ | |
return sigdelset(&ioset->sigmask, signo); | |
} | |
int ioset_block_signal(struct ioset *ioset, int signo) | |
{ | |
return sigaddset(&ioset->sigmask, signo); | |
} | |
int ioset_fileno(const struct io_fd *fd) | |
{ | |
return fd->fildes; | |
} | |
struct io_fd *ioset_add(struct ioset *ioset, int fildes) | |
{ | |
int res; | |
struct io_fd *io_fd; | |
io_fd = calloc(1, sizeof(*io_fd)); | |
io_fd->fildes = fildes; | |
io_fd->ioset = ioset; | |
io_fd->wants_reads = 1; | |
res = epoll_add(io_fd); | |
if (res < 0) { | |
fprintf(stderr, "Unable to add fd %d to ioset %s: %s\n", | |
fildes, ioset->name, strerror(errno)); | |
free(io_fd); | |
return NULL; | |
} | |
res = fcntl(fildes, F_GETFL); | |
fcntl(fildes, F_SETFL, res | O_NONBLOCK); | |
res = fcntl(fildes, F_GETFD); | |
fcntl(fildes, F_SETFD, res | FD_CLOEXEC); | |
ioset_list_add(ioset, io_fd); | |
ioq_init(&io_fd->send, 1024); | |
ioq_init(&io_fd->recv, 1024); | |
return io_fd; | |
} | |
int ioset_make_local(int socktype, const char *address, unsigned int port, | |
struct sockaddr *res, socklen_t *res_addrlen) | |
{ | |
int gai_error; | |
char portstr[8]; | |
struct addrinfo hints; | |
struct addrinfo *ai; | |
memset(&hints, 0, sizeof(hints)); | |
hints.ai_flags = AI_PASSIVE; | |
hints.ai_socktype = socktype; | |
snprintf(portstr, sizeof(portstr), "%u", port); | |
gai_error = getaddrinfo(address, portstr, &hints, &ai); | |
if (gai_error) | |
return gai_error; | |
if (ai->ai_addrlen < *res_addrlen) | |
*res_addrlen = ai->ai_addrlen; | |
memcpy(res, ai->ai_addr, *res_addrlen); | |
freeaddrinfo(ai); | |
return 0; | |
} | |
static int ioset_getaddrinfo(int family, int socktype, const char *node, | |
unsigned int service, struct addrinfo **res) | |
{ | |
int gai_error; | |
char srvc[8]; | |
struct addrinfo hints; | |
memset(&hints, 0, sizeof(hints)); | |
hints.ai_family = family; | |
hints.ai_socktype = socktype; | |
snprintf(srvc, sizeof(srvc), "%u", service); | |
gai_error = getaddrinfo(node, srvc, &hints, res); | |
return gai_error; | |
} | |
static int ioset_bind(int fd, const struct sockaddr *addr, socklen_t addrlen) | |
{ | |
int res; | |
unsigned int option = 1; | |
if (!addr) | |
return -1; | |
res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)); | |
if (res < 0) { | |
fprintf(stderr, "Unable to mark local address for fd %d " | |
"as reuseable: %s\n", fd, strerror(errno)); | |
} | |
res = bind(fd, addr, addrlen); | |
if (res < 0) { | |
fprintf(stderr, "Unable to bind fd %d to local address: %s\n", | |
fd, strerror(errno)); | |
} | |
return res; | |
} | |
struct io_fd *ioset_tcp_connect(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
const char *peer, unsigned int port, | |
void (*connectcb)(struct io_fd *, int)) | |
{ | |
int fd; | |
int res; | |
int family; | |
struct io_fd *io_fd; | |
struct io_fd *old_active; | |
struct addrinfo *ai; | |
family = local ? local->sa_family : PF_INET; | |
fd = socket(family, SOCK_STREAM, IPPROTO_TCP); | |
if (fd < 0) { | |
fprintf(stderr, "Unable to create TCP socket for %s:%u... %s\n", | |
peer, port, strerror(errno)); | |
return NULL; | |
} | |
family = local ? local->sa_family : AF_INET; | |
res = ioset_getaddrinfo(family, SOCK_STREAM, peer, port, &ai); | |
if (res) { | |
fprintf(stderr, "Unable to resolve peer %s:%u... %s\n", | |
peer, port, gai_strerror(res)); | |
return NULL; | |
} | |
res = ioset_bind(fd, local, local_addrlen); | |
if (res < 0) { | |
fprintf(stderr, "Failed to set the local address for peer " | |
"%s:%u... Letting the operating system decide " | |
"the local address.\n", peer, port); | |
} | |
io_fd = ioset_add(ioset, fd); | |
res = connect(fd, ai->ai_addr, ai->ai_addrlen); | |
freeaddrinfo(ai); | |
if (!io_fd) { | |
close(fd); | |
return NULL; | |
} | |
io_fd->state = IO_CONNECTING; | |
io_fd->connectcb = connectcb; | |
if (res < 0) switch (errno) { | |
case EINPROGRESS: | |
epoll_update(io_fd); | |
return io_fd; | |
default: | |
fprintf(stderr, "Unable to connect to peer %s:%d... %s\n", | |
peer, port, strerror(errno)); | |
ioset_shutdown(io_fd, SHUTDOWN_CLOSE); | |
return NULL; | |
} | |
io_fd->state = IO_CONNECTED; | |
old_active = ioset->active_fd; | |
if (connectcb) | |
connectcb(io_fd, 0); | |
if (ioset->active_fd == io_fd) | |
epoll_update(io_fd); | |
ioset->active_fd = old_active; | |
return io_fd; | |
} | |
struct io_fd *ioset_tcp_listen(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
void (*acceptcb)(struct io_fd *, struct io_fd *)) | |
{ | |
int fd; | |
int res; | |
struct io_fd *io_fd; | |
res = local ? local->sa_family : PF_INET; | |
fd = socket(res, SOCK_STREAM, IPPROTO_TCP); | |
if (fd < 0) { | |
fprintf(stderr, "Unable to create new TCP listening fd: %s\n", | |
strerror(errno)); | |
return NULL; | |
} | |
res = ioset_bind(fd, local, local_addrlen); | |
if (res < 0) { | |
fprintf(stderr, "Unable to bind TCP listening socket %d to " | |
"local address: %s\n", fd, strerror(errno)); | |
close(fd); | |
return NULL; | |
} | |
res = listen(fd, 8); | |
if (res < 0) { | |
fprintf(stderr, "Unable to listen for new TCP connections " | |
"on socket fd %d: %s\n", fd, strerror(errno)); | |
close(fd); | |
return NULL; | |
} | |
io_fd = ioset_add(ioset, fd); | |
if (!io_fd) { | |
close(fd); | |
return NULL; | |
} | |
io_fd->state = IO_LISTENING; | |
io_fd->acceptcb = acceptcb; | |
epoll_update(io_fd); | |
return io_fd; | |
} | |
struct io_fd *ioset_udp_connect(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
const char *peer, unsigned int port, | |
void (*connectcb)(struct io_fd *, int)) | |
{ | |
int fd; | |
int res; | |
int family; | |
struct io_fd *io_fd; | |
struct io_fd *old_active; | |
struct addrinfo *ai; | |
family = local ? local->sa_family : PF_INET; | |
fd = socket(family, SOCK_DGRAM, IPPROTO_UDP); | |
if (fd < 0) { | |
fprintf(stderr, "Unable to create UDP socket for %s:%u... %s\n", | |
peer, port, strerror(errno)); | |
return NULL; | |
} | |
family = local ? local->sa_family : AF_INET; | |
res = ioset_getaddrinfo(family, SOCK_DGRAM, peer, port, &ai); | |
if (res) { | |
fprintf(stderr, "Unable to resolve peer %s:%u... %s\n", | |
peer, port, strerror(errno)); | |
return NULL; | |
} | |
res = ioset_bind(fd, local, local_addrlen); | |
if (res < 0) { | |
fprintf(stderr, "Failed to set the local address for peer " | |
"%s:%u... Letting the operating system decide " | |
"the local address.\n", peer, port); | |
} | |
io_fd = ioset_add(ioset, fd); | |
if (!io_fd) { | |
close(fd); | |
freeaddrinfo(ai); | |
return NULL; | |
} | |
io_fd->state = IO_CONNECTDG; | |
io_fd->connectcb = connectcb; | |
io_fd->dest = malloc(ai->ai_addrlen); | |
io_fd->dest_addrlen = ai->ai_addrlen; | |
memcpy(io_fd->dest, ai->ai_addr, ai->ai_addrlen); | |
old_active = ioset->active_fd; | |
ioset->active_fd = io_fd; | |
if (connectcb) | |
connectcb(io_fd, 0); | |
if (ioset->active_fd == io_fd) | |
epoll_update(io_fd); | |
ioset->active_fd = old_active; | |
return io_fd; | |
} | |
struct io_fd *ioset_udp_listen(struct ioset *ioset, | |
const struct sockaddr *local, | |
socklen_t local_addrlen, | |
void (*readablecb)(struct io_fd *, | |
const struct sockaddr *, | |
socklen_t)) | |
{ | |
int fd; | |
int res; | |
struct io_fd *io_fd; | |
res = local ? local->sa_family : PF_INET; | |
fd = socket(res, SOCK_DGRAM, IPPROTO_UDP); | |
if (fd < 0) { | |
fprintf(stderr, "Unable to create listening UDP socket: %s\n", | |
strerror(errno)); | |
return NULL; | |
} | |
res = ioset_bind(fd, local, local_addrlen); | |
if (res < 0) { | |
fprintf(stderr, "Unable to bind listening UDP socket %d " | |
"to local address: %s\n", fd, strerror(errno)); | |
close(fd); | |
return NULL; | |
} | |
io_fd = ioset_add(ioset, fd); | |
if (!io_fd) { | |
close(fd); | |
return NULL; | |
} | |
io_fd->state = IO_LISTENDG; | |
io_fd->readablecb = readablecb; | |
epoll_update(io_fd); | |
return io_fd; | |
} | |
int ioset_close(struct io_fd *fd) | |
{ | |
return ioset_shutdown(fd, SHUTDOWN_FCLOSE); | |
} | |
int ioset_shutdown(struct io_fd *fd, enum shutdown_mode mode) | |
{ | |
int fildes = fd->fildes; | |
struct ioset *ioset = fd->ioset; | |
fd->state = IO_CLOSED; | |
if (ioset->active_fd == fd) | |
ioset->active_fd = NULL; | |
if (fd->closecb) | |
fd->closecb(fd); | |
if ((fd->send.put != fd->send.get) && (mode & SHUTDOWN_FLUSH)) { | |
int flags; | |
flags = fcntl(fildes, F_GETFL); | |
fcntl(fildes, F_SETFL, flags & ~O_NONBLOCK); | |
ioset_writeable(fd); | |
if (fd->send.put != fd->send.get) | |
ioset_writeable(fd); | |
} | |
epoll_remove(fd); | |
ioset_list_del(ioset, fd); | |
free(fd->send.buf); | |
free(fd->recv.buf); | |
free(fd->dest); | |
free(fd); | |
return (mode & SHUTDOWN_CLOSE) ? close(fildes) : fildes; | |
} | |
size_t ioset_read(struct io_fd *fd, void *dest, size_t max) | |
{ | |
size_t pos = 0; | |
size_t get_avail; | |
size_t byte_avail; | |
struct ioq *recv = &fd->recv; | |
get_avail = ioq_get_avail(recv); | |
byte_avail = ioq_used(recv); | |
if (!get_avail) | |
return 0; | |
if (max > byte_avail) | |
max = byte_avail; | |
if (max > get_avail) { | |
memcpy(dest, recv->buf + recv->get, get_avail); | |
recv->get = 0; | |
pos = get_avail; | |
} | |
memcpy(dest + pos, recv->buf + recv->get, max - pos); | |
recv->get += max - pos; | |
if (recv->get == recv->size) | |
recv->get = 0; | |
return max; | |
} | |
size_t ioset_readline(struct io_fd *fd, void *dest, size_t max) | |
{ | |
size_t line_len; | |
line_len = ioq_find_line_length(&fd->recv); | |
if (!line_len) | |
return 0; | |
return ioset_read(fd, dest, (max > line_len) ? line_len : max); | |
} | |
size_t ioset_write(struct io_fd *fd, const void *buf, size_t nbyte) | |
{ | |
size_t avail; | |
ssize_t res = 0; | |
struct ioq *send = &fd->send; | |
avail = ioq_used(send); | |
while (avail + nbyte >= send->size) | |
ioq_grow(send); | |
avail = ioq_put_avail(send); | |
if (nbyte > avail) { | |
memcpy(send->buf + send->put, buf, avail); | |
send->put = 0; | |
res = avail; | |
nbyte -= avail; | |
buf += avail; | |
} | |
memcpy(send->buf + send->put, buf, nbyte); | |
send->put += nbyte; | |
if (send->put == send->size) | |
send->put = 0; | |
epoll_update(fd); | |
return res + nbyte; | |
} | |
ssize_t ioset_writeto(struct io_fd *fd, const void *buf, size_t nbyte, | |
struct sockaddr *dest, socklen_t dest_addrlen) | |
{ | |
ssize_t res; | |
res = sendto(fd->fildes, buf, nbyte, 0, dest, dest_addrlen); | |
if ((res < 0) && (errno != EAGAIN)) { | |
fprintf(stderr, "ioset_sendto() error on fd %d: %s\n", | |
fd->fildes, strerror(errno)); | |
} | |
return res; | |
} | |
static ssize_t ioset_recvfrom(struct io_fd *fd, void *dest, size_t max, | |
struct sockaddr *addr, socklen_t *addrlen) | |
{ | |
switch (fd->state) { | |
case IO_LISTENDG: | |
case IO_CONNECTDG: | |
return recvfrom(fd->fildes, dest, max, 0, addr, addrlen); | |
case IO_CONNECTED: | |
memset(addr, 0, *addrlen); | |
return read(fd->fildes, dest, max); | |
default: | |
errno = EINVAL; | |
return -1; | |
} | |
} | |
static ssize_t ioset_sendto(struct io_fd *fd, const void *buf, size_t nbyte) | |
{ | |
switch (fd->state) { | |
case IO_LISTENDG: | |
case IO_CONNECTDG: | |
return sendto(fd->fildes, buf, nbyte, 0, fd->dest, | |
fd->dest_addrlen); | |
case IO_CONNECTED: | |
return write(fd->fildes, buf, nbyte); | |
default: | |
errno = EINVAL; | |
return -1; | |
} | |
} | |
static void ioset_accept(struct io_fd *listener) | |
{ | |
int fd; | |
struct io_fd *new_fd; | |
struct io_fd *old_active; | |
struct sockaddr addr; | |
socklen_t addrlen = sizeof(addr); | |
struct ioset *ioset = listener->ioset; | |
fd = accept(listener->fildes, &addr, &addrlen); | |
if (fd < 0) { | |
fprintf(stderr, "Unable to accept new TCP connection on " | |
"listening fd %d: %s\n", | |
listener->fildes, strerror(errno)); | |
return; | |
} | |
new_fd = ioset_add(ioset, fd); | |
if (!new_fd) { | |
close(fd); | |
return; | |
} | |
new_fd->state = IO_CONNECTED; | |
new_fd->dest = malloc(addrlen); | |
new_fd->dest_addrlen = addrlen; | |
memcpy(new_fd->dest, &addr, addrlen); | |
old_active = ioset->active_fd; | |
ioset->active_fd = new_fd; | |
listener->acceptcb(listener, new_fd); | |
if (ioset->active_fd == new_fd) { | |
if (new_fd->send.put != new_fd->send.get) { | |
ioset_writeable(new_fd); | |
} else { | |
epoll_update(new_fd); | |
} | |
} | |
ioset->active_fd = old_active; | |
} | |
static void ioset_connecting(struct io_fd *fd, uint32_t readable, | |
uint32_t writeable) | |
{ | |
int res; | |
if (readable) { | |
int option; | |
socklen_t option_len; | |
option_len = sizeof(option); | |
res = getsockopt(fd->fildes, SOL_SOCKET, SO_ERROR, &option, | |
&option_len); | |
res = (res < 0) ? errno : option; | |
fd->state = IO_CLOSED; | |
} else if (writeable) { | |
res = 0; | |
fd->state = IO_CONNECTED; | |
} | |
if (fd->connectcb) | |
fd->connectcb(fd, res); | |
} | |
static void ioset_readable(struct io_fd *fd) | |
{ | |
size_t avail; | |
ssize_t res; | |
struct ioq *recv = &fd->recv; | |
struct ioset *ioset = fd->ioset; | |
struct sockaddr source; | |
socklen_t source_addrlen = sizeof(source); | |
avail = ioq_put_avail(recv); | |
if (!avail) | |
avail = ioq_grow(recv); | |
res = ioset_recvfrom(fd, recv->buf + recv->put, avail, &source, | |
&source_addrlen); | |
if (res < 0) { | |
if (errno == EAGAIN) | |
return; | |
fprintf(stderr, "Unexpected read() error on fd %d: %s\n", | |
fd->fildes, strerror(errno)); | |
fd->state = IO_CLOSED; | |
fd->readablecb(fd, &source, source_addrlen); | |
if (ioset->active_fd == fd) | |
epoll_update(fd); | |
} else if (!res) { | |
fd->state = IO_CLOSED; | |
fd->readablecb(fd, &source, source_addrlen); | |
if (ioset->active_fd == fd) | |
epoll_update(fd); | |
} else { | |
recv->put += res; | |
if (recv->put == recv->size) | |
recv->put = 0; | |
do { | |
fd->readablecb(fd, &source, source_addrlen); | |
if (ioset->active_fd != fd) | |
break; | |
epoll_update(fd); | |
} while (fd->recv.put != fd->recv.get); | |
} | |
} | |
static void ioset_writeable(struct io_fd *fd) | |
{ | |
size_t avail; | |
ssize_t res; | |
struct ioq *send = &fd->send; | |
avail = ioq_get_avail(send); | |
res = ioset_sendto(fd, send->buf + send->get, avail); | |
if (res < 0) { | |
if (errno == EAGAIN) | |
return; | |
fprintf(stderr, "Unexpected write() error on fd %d: %s\n", | |
fd->fildes, strerror(errno)); | |
} else { | |
send->get += res; | |
if (send->get == send->size) | |
send->get = 0; | |
epoll_update(fd); | |
} | |
} | |
static void ioset_event(struct io_fd *fd, uint32_t events) | |
{ | |
struct ioset *ioset = fd->ioset; | |
uint32_t writeable = events & EPOLLOUT; | |
uint32_t readable = events & (EPOLLIN | EPOLLHUP | EPOLLERR); | |
ioset->active_fd = fd; | |
switch (fd->state) { | |
case IO_CLOSED: | |
ioset_shutdown(fd, SHUTDOWN_CLOSE); | |
break; | |
case IO_LISTENING: | |
if (readable) | |
ioset_accept(fd); | |
break; | |
case IO_CONNECTING: | |
ioset_connecting(fd, readable, writeable); | |
if (ioset->active_fd != fd) | |
break; | |
epoll_update(fd); | |
case IO_LISTENDG: | |
case IO_CONNECTDG: | |
case IO_CONNECTED: | |
if (readable && ioset->active_fd) | |
ioset_readable(fd); | |
if (writeable && ioset->active_fd) | |
ioset_writeable(fd); | |
break; | |
} | |
} | |
int ioset_dispatch(struct ioset *ioset, struct timeval *tv) | |
{ | |
int ii; | |
int res; | |
int timeout = -1; | |
int epoll_fd = ioset->epoll_fd; | |
int maxevents = ioset->maxevents; | |
struct epoll_event *events = ioset->events; | |
const sigset_t *sigmask = &ioset->sigmask; | |
if (tv) { | |
timeout = tv->tv_usec / 1000; | |
timeout += tv->tv_sec * 1000; | |
if (timeout > MAX_TIMEOUT) | |
timeout = MAX_TIMEOUT; | |
} | |
res = epoll_pwait(epoll_fd, events, maxevents, timeout, sigmask); | |
if (res < 0) { | |
fprintf(stderr, "epoll_pwait() error in ioset %s: %s\n", | |
ioset->name, strerror(errno)); | |
return (errno == EINTR) ? 0 : -1; | |
} | |
for (ii = 0; ii < res; ++ii) | |
ioset_event(events[ii].data.ptr, events[ii].events); | |
if ((res == maxevents) && (maxevents < MAX_MAXEVENTS)) { | |
maxevents <<= 1; | |
events = realloc(ioset->events, | |
maxevents * sizeof(*ioset->events)); | |
if (events) { | |
ioset->events = events; | |
ioset->maxevents = maxevents; | |
} | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment