Skip to content

Instantly share code, notes, and snippets.

@tjw
Created March 28, 2018 22:23
Show Gist options
  • Save tjw/e35bf6b3cf42c91f6f7f4a9ad5f74bd5 to your computer and use it in GitHub Desktop.
Save tjw/e35bf6b3cf42c91f6f7f4a9ad5f74bd5 to your computer and use it in GitHub Desktop.
A dispatch write source that is connected to a UNIX domain socket with a full write buffer will fire forever, instead of waiting to fire until there is room in the buffer to write more data.
//
// main.m
// DispatchWriteSource
//
// Created by Timothy J. Wood on 3/27/18.
// Copyright © 2018 The Omni Group. All rights reserved.
//
#import <Foundation/Foundation.h>
#import <unistd.h>
#import <err.h>
#import <sys/socket.h>
#import <sys/un.h>
#import <sys/ioctl.h>
/*
A dispatch write source that is connected to a UNIX domain socket with a full write buffer will fire forever, instead of waiting to fire until there is room in the buffer to write more data.
% clang -Wall DispatchWriteSource.m -framework Foundation -o /tmp/DispatchWriteSource
*/
typedef struct {
int read;
int write;
} FileDescriptors;
static FileDescriptors PipeFileDescriptors(void)
{
int fds[2];
if (pipe(fds) != 0) {
err(1, "pipe");
}
return (FileDescriptors){.read = fds[0], .write = fds[1]};
}
static inline socklen_t FillSockaddrForPath(struct sockaddr_un * __nonnull sa, const char *path)
{
size_t pathlen = strlen(path);
if (pathlen >= sizeof(sa->sun_path)) {
errno = ENAMETOOLONG;
return 0;
}
bzero(sa, sizeof(*sa));
sa->sun_family = AF_UNIX;
/* We should theoretically set sun_len here, but it's not used on most systems, including Darwin (in the kernel, the length field is reset to be the socklen passed to connect()/bind()). */
memcpy(sa->sun_path, path, pathlen);
return (socklen_t)(sizeof(*sa) - sizeof(sa->sun_path) + pathlen);
}
static FileDescriptors UnixDomainSocketFileDescriptors(const char *path)
{
struct sockaddr_un sunaddr = {0};
socklen_t sunaddr_len = FillSockaddrForPath(&sunaddr, path);
int server = socket(PF_UNIX, SOCK_STREAM, 0);
if (server < 0) {
err(1, "socket");
}
do {
int lastError = 0;
for (int try = 0; try < 3; try++) {
if (bind(server, (const struct sockaddr *)&sunaddr, sunaddr_len) != 0) {
lastError = errno;
if (errno == EEXIST || errno == EADDRINUSE) {
if (unlink(path) != 0) {
err(1, "unlink");
}
} else {
err(1, "bind");
}
} else {
lastError = 0;
break;
}
}
if (lastError != 0) {
errc(1, lastError, "bind");
}
} while(0);
if (listen(server, 5) != 0) {
err(1, "listen");
}
__block int readFD;
dispatch_queue_t acceptQueue = dispatch_queue_create("com.omnigroup.DispatchWriteSource.accept", NULL);
dispatch_async(acceptQueue, ^{
struct sockaddr addr = {0};
socklen_t addr_len = 0;
readFD = accept(server, &addr, &addr_len);
if (readFD < 0) {
err(1, "accept");
}
});
int writeFD = socket(PF_UNIX, SOCK_STREAM, 0);
if (connect(writeFD, (const struct sockaddr *)&sunaddr, sunaddr_len) != 0) {
err(1, "connect");
}
dispatch_sync(acceptQueue, ^{}); // Probably already completed due to connect blocking, but the assignment to the readFD might not be done.
return (FileDescriptors){.read = readFD, .write = writeFD};
}
int main(int argc, const char * argv[])
{
// FileDescriptors fds = PipeFileDescriptors();
FileDescriptors fds = UnixDomainSocketFileDescriptors("/tmp/unix-domain-socket");
// If fds.write is blocking then we'll get signalled to write when there is *any* room, but will block if there isn't room for our full buffer.
int yes = 1;
if (ioctl(fds.write, FIONBIO, &yes) != 0) {
err(1, "ioctl");
}
// Fill up the write buffer
do {
size_t totalWritten = 0;
size_t remaining = 1024;
char *buf = calloc(remaining, 1);
while (YES) {
ssize_t written = write(fds.write, buf, remaining);
if (written > 0) {
totalWritten += written;
}
if (errno == EINTR) {
continue;
}
if (errno == EWOULDBLOCK) {
break;
}
}
free(buf);
fprintf(stderr, "Filled write buffer with %ld bytes\n", totalWritten);
} while (0);
// Create a dispatch write source, which should only wake up when there is room to write to the file descriptor.
dispatch_queue_t writeQueue = dispatch_queue_create("com.omnigroup.DispatchWriteSource.reader", NULL);
dispatch_source_t writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fds.write, 0/*mask*/, writeQueue);
dispatch_source_set_event_handler(writeSource, ^{
fprintf(stderr, "Write source fired\n");
});
dispatch_resume(writeSource);
dispatch_main();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment