Skip to content

Instantly share code, notes, and snippets.

@rodionovd
Last active July 6, 2023 05:15
Show Gist options
  • Save rodionovd/b55ded7404999e55cad1 to your computer and use it in GitHub Desktop.
Save rodionovd/b55ded7404999e55cad1 to your computer and use it in GitHub Desktop.
ssd -- Socket Server demo by Apple
// Copyright (C) 2010 Apple Inc. All Rights Reserved.
#include <launch.h>
#include <libkern/OSAtomic.h>
#include <vproc.h>
#include "shared.h"
static bool g_is_managed = false;
static bool g_accepting_requests = true;
static dispatch_source_t g_timer_source = NULL;
/* OSAtomic*() requires signed quantities. */
static int32_t g_transaction_count = 0;
int
server_check_in(void)
{
int sockfd = -1;
if (g_is_managed) {
CFShow( CFSTR( "server_check_in - managed2" ) );
/* If we're running under a production scenario, then we check in with
* launchd to get our socket file descriptors.
*/
launch_data_t req = launch_data_new_string(LAUNCH_KEY_CHECKIN);
assert(req != NULL);
launch_data_t resp = launch_msg(req);
assert(resp != NULL);
assert(launch_data_get_type(resp) == LAUNCH_DATA_DICTIONARY);
launch_data_t sockets = launch_data_dict_lookup(resp, LAUNCH_JOBKEY_SOCKETS);
assert(sockets != NULL);
assert(launch_data_get_type(sockets) == LAUNCH_DATA_DICTIONARY);
launch_data_t sarr = launch_data_dict_lookup(sockets, "com.apple.ssd.sock");
assert(sarr != NULL);
assert(launch_data_get_type(sarr) == LAUNCH_DATA_ARRAY);
/* This is the IPv6 socket, and it is always located at index 0. For this
* example, we're going to just ignore it.
*/
launch_data_t sockv6 = launch_data_array_get_index(sarr, 0);
assert(sockv6 != NULL);
assert(launch_data_get_type(sockv6) == LAUNCH_DATA_FD);
/* This is the IPv4 socket. This is the one we're going to accept(2)
* connections on.
*/
launch_data_t sockv4 = launch_data_array_get_index(sarr, 1);
assert(sockv4 != NULL);
assert(launch_data_get_type(sockv4) == LAUNCH_DATA_FD);
sockfd = launch_data_get_fd(sockv4);
assert(sockfd != -1);
/* We don't need this, so close it. */
(void)close(launch_data_get_fd(sockv6));
}
else
{
CFShow( CFSTR( "server_check_in - not managed" ) );
/* We're running under a debugger, so set up the socket by hand. In this
* case, we're setting up a TCP socket that listens on port 1138. Under
* launchd, all of this is taken care of for us in the plist.
*/
sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd != -1);
struct sockaddr_in saddr;
(void)bzero(&saddr, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(1138);
saddr.sin_addr.s_addr = INADDR_ANY;
int result = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
assert(result == 0);
result = listen(sockfd, -1);
assert(result == 0);
}
CFShow( CFStringCreateWithFormat( NULL, NULL, CFSTR( "sockfd: %d" ), sockfd ) );
CFShow( CFSTR( "server exit server_check_in" ) );
return sockfd;
}
vproc_transaction_t
server_open_transaction(void)
{
/* Atomically increment our count of outstanding requests. Even though
* this happens serially, remember that requests themselves are handled
* concurrently on GCD's default priority queue. So when the requests are
* closed out, it can happen asynchronously with respect to this section
* Thus, any manipulation of the transaction counter needs to be guarded.
*/
if (OSAtomicIncrement32(&g_transaction_count) - 1 == 0) {
dispatch_source_set_timer(g_timer_source, DISPATCH_TIME_FOREVER, 0llu, 0llu);
}
/* Open a new transaction. This tells Instant Off that we are "dirty" and
* should not be sent SIGKILL if the time comes to shut the system down.
* Instead, we will be sent SIGTERM.
*/
return vproc_transaction_begin(NULL);
}
void
server_close_transaction(vproc_transaction_t vt)
{
if (OSAtomicDecrement32(&g_transaction_count) == 0) {
dispatch_time_t t0 = dispatch_time(DISPATCH_TIME_NOW, 20llu * NSEC_PER_SEC);
dispatch_source_set_timer(g_timer_source, t0, 0llu, 0llu);
}
vproc_transaction_end(NULL, vt);
}
void
server_send_reply(int fd, dispatch_queue_t q, CFDataRef data, vproc_transaction_t vt)
{
size_t total = sizeof(struct ss_msg_s) + CFDataGetLength(data);
unsigned char *buff = (unsigned char *)malloc(total);
assert(buff != NULL);
struct ss_msg_s *msg = (struct ss_msg_s *)buff;
msg->_len = OSSwapHostToLittleInt32(total - sizeof(struct ss_msg_s));
/* Coming up with a more efficient implementation is left as an exercise to
* the reader.
*/
(void)memcpy(msg->_bytes, CFDataGetBytePtr(data), CFDataGetLength(data));
dispatch_source_t s = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, q);
assert(s != NULL);
__block unsigned char *track_buff = buff;
__block size_t track_sz = total;
dispatch_source_set_event_handler(s, ^(void) {
ssize_t nbytes = write(fd, track_buff, track_sz);
if (nbytes != -1) {
track_buff += nbytes;
track_sz -= nbytes;
if (track_sz == 0) {
dispatch_source_cancel(s);
}
}
});
dispatch_source_set_cancel_handler(s, ^(void)
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_WRITE - canceled" ) );
/* We're officially done with this request, so close out the connection
* and free the resources ass_ociated with it.
*/
(void)close(fd);
free(buff);
dispatch_release(s);
server_close_transaction(vt);
});
dispatch_resume(s);
}
void
server_handle_request(int fd, const void *buff, size_t total, vproc_transaction_t vt)
{
CFDataRef data = CFDataCreateWithBytesNoCopy(NULL, buff, total, kCFAllocatorNull);
assert(data != NULL);
CFPropertyListRef plist = CFPropertyListCreateWithData(NULL, data, kCFPropertyListImmutable, NULL, NULL);
assert(plist != NULL);
/* Handle request, create reply (of a property list type). In this case,
* we'll just blurt the request back to the client. But feel free to do
* something interesting here. Remember, this section is being run on the
* default-priority concurrent queue, so make sure the work for various
* clients can be done in parallel. If not, then you should consider
* creating your own serial queue.
*/
CFDataRef replyData = CFPropertyListCreateData(NULL, (CFPropertyListRef)plist, kCFPropertyListBinaryFormat_v1_0, 0, NULL);
assert(replyData != NULL);
CFRelease(data);
CFRelease(plist);
server_send_reply(fd, dispatch_get_current_queue(), replyData, vt);
/* ss_send_reply() copies the data from replyData out, so we can safely
* release it here. But remember, that's an inefficient design.
*/
CFRelease(replyData);
}
bool
server_read(int fd, unsigned char *buff, size_t buff_sz, size_t *total)
{
CFShow( CFSTR( "server_read" ) );
bool result = false;
struct ss_msg_s *msg = (struct ss_msg_s *)buff;
unsigned char* track_buff = buff + *total;
size_t track_sz = buff_sz - *total;
ssize_t nbytes = read(fd, track_buff, track_sz);
printf( "nbytes: %ld\n", nbytes );
printf( "sizeof(struct ss_msg_s): %ld\n", sizeof(struct ss_msg_s) );
printf( "totalA: %ld\n", *total );
if ( nbytes != -1 )
{
*total += nbytes;
printf( "totalB: %ld\n", *total );
/* We do this swap on every read(2), which is wasteful. But there is a
* way to avoid doing this every time and not introduce an extra
* parameter. See if you can find it.
*/
if ( *total >= sizeof( struct ss_msg_s ) )
{
msg->_len = OSSwapLittleToHostInt32(msg->_len);
printf( "msg->_len: %u\n", msg->_len );
if (msg->_len == (*total - sizeof(struct ss_msg_s)))
{
result = true;
}
}
}
return result;
}
void
server_accept(int fd, dispatch_queue_t q)
{
/* This variable needs to be mutable in the block. Setting __block will
* ensure that, when dispatch_source_set_event_handler(3) copies it to
* the heap, this variable will be copied to the heap as well, so it'll
* be safely mutable in the block.
*/
__block size_t total = 0;
vproc_transaction_t vt = server_open_transaction();
/* For large allocations like this, the VM system will lazily create
* the pages, so we won't get the full 10 MB (or anywhere near it) upfront.
* A smarter implementation would read the intended mess_age size upfront
* into a fixed-size buffer and then allocate the needed space right there.
* But if our requests are almost always going to be this small, then we
* avoid a potential second trap into the kernel to do the second read(2).
* Also, we avoid a second copy-out of the data read.
*/
size_t buff_sz = 10 * 1024 * 1024;
void *buff = malloc(buff_sz);
assert(buff != NULL);
dispatch_source_t s = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, q);
assert(s != NULL);
dispatch_source_set_event_handler(s, ^(void)
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_READ B" ) );
/* You may be asking yourself, "Doesn't the fact that we're on a concurrent
* queue mean that multiple event handler blocks could be running
* simultaneously for the same source?" The answer is no. Parallelism for
* the global concurrent queues is at the source level, not the event
* handler level. So for each source, exactly one invocation of the event
* handler can be inflight. When scheduling on a concurrent queue, it
* means that that handler may be running concurrently with other sources'
* event handlers, but not its own.
*/
if ( server_read(fd, buff, buff_sz, &total) )
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_READ B - server_read success" ) );
struct ss_msg_s *msg = (struct ss_msg_s *)buff;
server_handle_request(fd, msg->_bytes, msg->_len, vt);
/* After handling the request (which, in this case, means that we've
* scheduled a source to deliver the reply), we no longer need this
* source. So we cancel it.
*/
dispatch_source_cancel(s);
}
});
dispatch_source_set_cancel_handler(s, ^(void)
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_READ B - canceled" ) );
/* We'll close out the file descriptor after sending the reply, so in the
* write source's cancellation handler.
*/
dispatch_release(s);
free(buff);
});
dispatch_resume(s);
}
int
main(int argc, const char *argv[])
{
/* An argv[1] of "launchd" indicates that were were launched by launchd.
* Note that we ONLY do this check for debugging purposes. There should be no
* production scenario where this daemon is not being managed by launchd.
*/
if (argc > 1 && strcmp(argv[1], "launchd") == 0) {
g_is_managed = true;
} else {
/* When running under a debugging environment, log mess_ages to stderr. */
(void)openlog("wwdc-debug", LOG_PERROR, 0);
}
/* This daemon handles events serially on the main queue. The events that
* are synchronized on the main queue are:
* • New connections
* • The idle-exit timer
* • The SIGTERM handler
*
* Note that actually handling requests is done concurrently.
*/
dispatch_queue_t mq = dispatch_get_main_queue();
g_timer_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, mq);
assert(g_timer_source != NULL);
/* When the idle-exit timer fires, we just call exit(2) with status 0. */
dispatch_set_context(g_timer_source, NULL);
dispatch_source_set_event_handler_f(g_timer_source, (void (*)(void *))exit);
/* We start off with our timer armed. This is for the simple reason that,
* upon kicking off the GCD state engine, the first thing we'll get to is
* a connection on our socket which will disarm the timer. Remember, handling
* new connections and the firing of the idle-exit timer are synchronized.
*/
dispatch_time_t t0 = dispatch_time(DISPATCH_TIME_NOW, 20llu * NSEC_PER_SEC);
dispatch_source_set_timer(g_timer_source, t0, 0llu, 0llu);
dispatch_resume(g_timer_source);
/* We must ignore the default action for SIGTERM so that GCD can safely receive it
* and distribute it across all interested parties in the address space.
*/
(void)signal(SIGTERM, SIG_IGN);
/* For Instant Off, we handle SIGTERM. Since SIGTERM is Instant Off's way of
* saying "Wind down your existing requests, and don't accept any new ones",
* we set a global saying to not accept new requests. This source fires
* synchronously with respect to the source which monitors for new connections
* on our socket, so things will be neatly synchronized. So unless_ it takes
* us 20 seconds between now and when we call dispatch_main(3), we'll be okay.
*/
dispatch_source_t sts = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, SIGTERM, 0, mq);
assert(sts != NULL);
dispatch_source_set_event_handler(sts, ^(void)
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_SIGNAL" ) );
/* If we get SIGTERM, that means that the system is on its way out, so
* we need to close out our existing requests and stop accepting new
* ones. At this point, we know that we have at least one outstanding
* request. Had we been clean, we would've received SIGKILL and just
* exited.
*
* Note that by adopting Instant Off, you are opting into a contract
* where you assert that launchd is the only entity which can
* legitimately send you SIGTERM.
*/
g_accepting_requests = false;
});
dispatch_resume(sts);
/* Now that we've set all that up, get our socket. */
int fd = server_check_in();
/* This is REQUIRED for GCD. To understand why, consider the following scenario:
* 0. GCD monitors the descriptor for bytes to read.
* 1. Bytes appear, so GCD fires off every source interested in whether there
* are bytes on that socket to read.
* 2. 1 of N sources fires and consumes all the outstanding bytes on the
* socket by calling read(2).
* 3. The other N - 1 sources fire and each attempt a read(2). Since all the
* data has been drained, each of those read(2) calls will block.
*
* This is highly undesirable. It is important to remember that parking a
* queue in an unbounded blocking call will prevent any other source that
* fires on that queue from doing so. So whenever poss_ible, we must avoid
* unbounded blocking in event handlers.
*/
(void)fcntl(fd, F_SETFL, O_NONBLOCK);
/* DISPATCH_SOURCE_TYPE_READ, in this context, means that the source will
* fire whenever there is a connection on the socket waiting to be accept(2)ed.
* I know what you're thinking after reading the above comment. "Doesn't this
* mean that it's safe for this socket to be blocking? The source won't fire
* until there is a connection to be accept(2)ed, right?"
*
* This is true, but it is important to remember that the client on the other
* end can cancel its attempt to connect. If the source fires after this has
* happened, accept(2) will block. So it is still important to set O_NONBLOCK
* on the socket.
*/
dispatch_source_t as = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, mq);
assert(as != NULL);
dispatch_source_set_event_handler(as, ^(void)
{
struct sockaddr saddr;
socklen_t slen = 0;
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_READ A" ) );
int afd = accept(fd, (struct sockaddr *)&saddr, &slen);
if (afd != -1)
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_READ A - accepted" ) );
/* Again, make sure the new connection's descriptor is non-blocking. */
(void)fcntl(fd, F_SETFL, O_NONBLOCK);
/* Check to make sure that we're still accepting new requests. */
if (g_accepting_requests)
{
/* We're going to handle all requests concurrently. This daemon uses an HTTP-style
* model, where each request comes through its own connection. Making a more
* efficient implementation is an exercise left to the reader.
*/
server_accept(afd, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0));
}
else
{
/* We're no longer accepting requests. */
(void)close(afd);
}
}
});
/* GCD requires that any source dealing with a file descriptor have a
* cancellation handler. Because GCD needs to keep the file descriptor
* around to monitor it, the cancellation handler is the client's signal
* that GCD is done with the file descriptor, and thus the client is safe
* to close it out. Remember, file descriptors aren't ref counted.
*/
dispatch_source_set_cancel_handler(as, ^(void)
{
CFShow( CFSTR( "DISPATCH_SOURCE_TYPE_READ A - canceled" ) );
dispatch_release(as);
(void)close(fd);
});
dispatch_resume(as);
CFShow( CFSTR( "server - dispatch_main" ) );
dispatch_main();
exit(EXIT_FAILURE);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment