Created
March 28, 2018 22:23
-
-
Save tjw/e35bf6b3cf42c91f6f7f4a9ad5f74bd5 to your computer and use it in GitHub Desktop.
A dispatch write source that is connected to a UNIX domain socket with a full write buffer will fire forever, instead of waiting to fire until there is room in the buffer to write more data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// main.m | |
// DispatchWriteSource | |
// | |
// Created by Timothy J. Wood on 3/27/18. | |
// Copyright © 2018 The Omni Group. All rights reserved. | |
// | |
#import <Foundation/Foundation.h> | |
#import <unistd.h> | |
#import <err.h> | |
#import <sys/socket.h> | |
#import <sys/un.h> | |
#import <sys/ioctl.h> | |
/* | |
A dispatch write source that is connected to a UNIX domain socket with a full write buffer will fire forever, instead of waiting to fire until there is room in the buffer to write more data. | |
% clang -Wall DispatchWriteSource.m -framework Foundation -o /tmp/DispatchWriteSource | |
*/ | |
typedef struct { | |
int read; | |
int write; | |
} FileDescriptors; | |
static FileDescriptors PipeFileDescriptors(void) | |
{ | |
int fds[2]; | |
if (pipe(fds) != 0) { | |
err(1, "pipe"); | |
} | |
return (FileDescriptors){.read = fds[0], .write = fds[1]}; | |
} | |
static inline socklen_t FillSockaddrForPath(struct sockaddr_un * __nonnull sa, const char *path) | |
{ | |
size_t pathlen = strlen(path); | |
if (pathlen >= sizeof(sa->sun_path)) { | |
errno = ENAMETOOLONG; | |
return 0; | |
} | |
bzero(sa, sizeof(*sa)); | |
sa->sun_family = AF_UNIX; | |
/* We should theoretically set sun_len here, but it's not used on most systems, including Darwin (in the kernel, the length field is reset to be the socklen passed to connect()/bind()). */ | |
memcpy(sa->sun_path, path, pathlen); | |
return (socklen_t)(sizeof(*sa) - sizeof(sa->sun_path) + pathlen); | |
} | |
static FileDescriptors UnixDomainSocketFileDescriptors(const char *path) | |
{ | |
struct sockaddr_un sunaddr = {0}; | |
socklen_t sunaddr_len = FillSockaddrForPath(&sunaddr, path); | |
int server = socket(PF_UNIX, SOCK_STREAM, 0); | |
if (server < 0) { | |
err(1, "socket"); | |
} | |
do { | |
int lastError = 0; | |
for (int try = 0; try < 3; try++) { | |
if (bind(server, (const struct sockaddr *)&sunaddr, sunaddr_len) != 0) { | |
lastError = errno; | |
if (errno == EEXIST || errno == EADDRINUSE) { | |
if (unlink(path) != 0) { | |
err(1, "unlink"); | |
} | |
} else { | |
err(1, "bind"); | |
} | |
} else { | |
lastError = 0; | |
break; | |
} | |
} | |
if (lastError != 0) { | |
errc(1, lastError, "bind"); | |
} | |
} while(0); | |
if (listen(server, 5) != 0) { | |
err(1, "listen"); | |
} | |
__block int readFD; | |
dispatch_queue_t acceptQueue = dispatch_queue_create("com.omnigroup.DispatchWriteSource.accept", NULL); | |
dispatch_async(acceptQueue, ^{ | |
struct sockaddr addr = {0}; | |
socklen_t addr_len = 0; | |
readFD = accept(server, &addr, &addr_len); | |
if (readFD < 0) { | |
err(1, "accept"); | |
} | |
}); | |
int writeFD = socket(PF_UNIX, SOCK_STREAM, 0); | |
if (connect(writeFD, (const struct sockaddr *)&sunaddr, sunaddr_len) != 0) { | |
err(1, "connect"); | |
} | |
dispatch_sync(acceptQueue, ^{}); // Probably already completed due to connect blocking, but the assignment to the readFD might not be done. | |
return (FileDescriptors){.read = readFD, .write = writeFD}; | |
} | |
int main(int argc, const char * argv[]) | |
{ | |
// FileDescriptors fds = PipeFileDescriptors(); | |
FileDescriptors fds = UnixDomainSocketFileDescriptors("/tmp/unix-domain-socket"); | |
// If fds.write is blocking then we'll get signalled to write when there is *any* room, but will block if there isn't room for our full buffer. | |
int yes = 1; | |
if (ioctl(fds.write, FIONBIO, &yes) != 0) { | |
err(1, "ioctl"); | |
} | |
// Fill up the write buffer | |
do { | |
size_t totalWritten = 0; | |
size_t remaining = 1024; | |
char *buf = calloc(remaining, 1); | |
while (YES) { | |
ssize_t written = write(fds.write, buf, remaining); | |
if (written > 0) { | |
totalWritten += written; | |
} | |
if (errno == EINTR) { | |
continue; | |
} | |
if (errno == EWOULDBLOCK) { | |
break; | |
} | |
} | |
free(buf); | |
fprintf(stderr, "Filled write buffer with %ld bytes\n", totalWritten); | |
} while (0); | |
// Create a dispatch write source, which should only wake up when there is room to write to the file descriptor. | |
dispatch_queue_t writeQueue = dispatch_queue_create("com.omnigroup.DispatchWriteSource.reader", NULL); | |
dispatch_source_t writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fds.write, 0/*mask*/, writeQueue); | |
dispatch_source_set_event_handler(writeSource, ^{ | |
fprintf(stderr, "Write source fired\n"); | |
}); | |
dispatch_resume(writeSource); | |
dispatch_main(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment