Created
July 22, 2012 04:15
-
-
Save jsanders/3158383 to your computer and use it in GitHub Desktop.
Echo server with connection migration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <netinet/in.h> | |
#include <sys/un.h> | |
#include <string.h> | |
#include <stdio.h> | |
#include <unistd.h> | |
#define MAX_CONNECTIONS 5 | |
#define BUFSIZE 1024 | |
#define SOCKPATH "/tmp/migrate.sock" | |
typedef struct { | |
int fds[MAX_CONNECTIONS]; | |
int num; | |
} connections_t; | |
void die(char *msg) { | |
perror(msg); | |
exit(1); | |
} | |
void bind_listener(int listener, unsigned short port) { | |
struct sockaddr_in address; | |
memset(&address, 0, sizeof(address)); | |
address.sin_family = AF_INET; | |
address.sin_addr.s_addr = htonl(INADDR_ANY); | |
address.sin_port = htons(port); | |
if(bind(listener, (struct sockaddr *) &address, sizeof(address)) < 0) { | |
die("ERROR on binding"); | |
} | |
} | |
int create_listener(unsigned short port) { | |
int listener = socket(AF_INET, SOCK_STREAM, 0); | |
if (listener < 0) { die("ERROR opening socket"); } | |
bind_listener(listener, port); | |
if(listen(listener, 5) < 0) { die("ERROR on listen"); } | |
return listener; | |
} | |
void bind_migrator(int migrator) { | |
struct sockaddr_un address; | |
memset(&address, 0, sizeof(address)); | |
address.sun_family = AF_UNIX; | |
strcpy(address.sun_path, SOCKPATH); | |
unlink(address.sun_path); | |
if(bind(migrator, (struct sockaddr *) &address, sizeof(address)) < 0) { | |
die("ERROR on binding migrator"); | |
} | |
} | |
int create_migrator_server() { | |
int migrator = socket(AF_UNIX, SOCK_STREAM, 0); | |
if(migrator < 0) { die("ERROR opening migrator socket"); } | |
bind_migrator(migrator); | |
if(listen(migrator, 5) < 0) { die("ERROR on migrator listen"); } | |
return migrator; | |
} | |
void connect_migrator(int migrator) { | |
struct sockaddr_un address; | |
memset(&address, 0, sizeof(address)); | |
address.sun_family = AF_UNIX; | |
strcpy(address.sun_path, SOCKPATH); | |
if(connect(migrator, (struct sockaddr *) &address, sizeof(address)) < 0) { | |
die("ERROR on connecting migrator"); | |
} | |
} | |
int create_migrator_client() { | |
int migrator = socket(AF_UNIX, SOCK_STREAM, 0); | |
if(migrator < 0) { die("ERROR opening migrator socket"); } | |
connect_migrator(migrator); | |
return migrator; | |
} | |
// Either creates a new listener and blank connections or migrates all connections | |
int init_or_migrate(int argc, char **argv, connections_t *connections) { | |
char *end; | |
unsigned short port; | |
int listener; | |
if (argc != 2) { | |
fprintf(stderr, "usage: %s <port> OR %s migrate\n", argv[0], argv[0]); | |
exit(1); | |
} | |
port = (unsigned short) strtol(argv[1], &end, 10); | |
if(*end != '\0') { | |
// Given anything besides a port number, we migrate | |
listener = migrate_in(connections); | |
} else { | |
listener = create_listener(port); | |
} | |
return listener; | |
} | |
int set_fds_for_connections(connections_t *connections, fd_set *fds, int fd_max) { | |
int curr_fd, i; | |
for(i = 0; i < connections->num; i++) { | |
curr_fd = connections->fds[i]; | |
FD_SET(curr_fd, fds); | |
if(curr_fd > fd_max) { fd_max = curr_fd; } | |
} | |
return fd_max; | |
} | |
int set_fds_for_select(int listener, int migrator, connections_t *connections, fd_set *fds) { | |
int fd_max = (listener < migrator) ? migrator : listener; | |
FD_ZERO(fds); | |
FD_SET(listener, fds); | |
FD_SET(migrator, fds); | |
return set_fds_for_connections(connections, fds, fd_max); | |
} | |
void add_connection(connections_t *connections, int fd) { | |
if(connections->num + 1 > MAX_CONNECTIONS) { die("ERROR too many connections"); } | |
connections->fds[connections->num++] = fd; | |
} | |
int accept_connection(int listener) { | |
struct sockaddr address; | |
socklen_t address_size = sizeof(address); | |
int fd = accept(listener, (struct sockaddr *) &address, &address_size); | |
if(fd < 0) { die("ERROR on accept"); } | |
return fd; | |
} | |
void handle_connection(int fd) { | |
char buf[BUFSIZE] = {0}; | |
int n = read(fd, buf, BUFSIZE); | |
if(n < 0) { die("ERROR reading from socket"); } | |
if(n == 0) { return; } // Punt on handling EOF properly | |
printf("server received %d bytes: %s", n, buf); | |
if(write(fd, buf, strlen(buf)) < 0) { die("ERROR writing to socket"); } | |
} | |
void handle_echo(connections_t *connections, fd_set *fds) { | |
int curr_fd, i; | |
for(i = 0; i < connections->num; i++) { | |
curr_fd = connections->fds[i]; | |
if(FD_ISSET(curr_fd, fds)) { | |
handle_connection(curr_fd); | |
} | |
} | |
} | |
int receive_fd(int migrator){ | |
char iovbuf[1]; | |
struct iovec iov; | |
struct msghdr msg = {0}; | |
struct cmsghdr *cmsg; | |
char buf[CMSG_SPACE(sizeof(int))]; | |
int fd, size; | |
unsigned char *data; | |
iov.iov_base = iovbuf; | |
iov.iov_len = 1; | |
memset(&msg, 0, sizeof(msg)); | |
msg.msg_iov = &iov; | |
msg.msg_iovlen = 1; | |
msg.msg_control = buf; | |
msg.msg_controllen = sizeof(buf); | |
size = recvmsg(migrator, &msg, 0); | |
if(size == 0) { return -2; } | |
if(size < 0) { return -1; } | |
cmsg = CMSG_FIRSTHDR(&msg); | |
if(!cmsg->cmsg_type == SCM_RIGHTS) { die("Not SCM_RIGHTS"); } | |
if(cmsg->cmsg_len != CMSG_LEN(sizeof(int))) { die("Wrong length"); } | |
memcpy(&fd, CMSG_DATA(cmsg), sizeof(int)); | |
return fd; | |
} | |
void send_fd(int migrator, int fd) { | |
// Data necessary to build control message | |
char iovbuf[1]; | |
struct iovec iov; | |
struct msghdr msg = {0}; | |
struct cmsghdr *cmsg; | |
char buf[CMSG_SPACE(sizeof(int))]; | |
// Configure control message | |
iovbuf[0] = 0; | |
iov.iov_base = iovbuf; | |
iov.iov_len = 1; | |
memset(&msg, 0, sizeof(msg)); | |
msg.msg_iov = &iov; | |
msg.msg_iovlen = 1; | |
msg.msg_control = buf; | |
msg.msg_controllen = CMSG_LEN(sizeof(int)); | |
cmsg = CMSG_FIRSTHDR(&msg); | |
cmsg->cmsg_level = SOL_SOCKET; | |
cmsg->cmsg_type = SCM_RIGHTS; // This is the magic that makes this thing work - see man 7 unix | |
cmsg->cmsg_len = CMSG_LEN(sizeof(int)); | |
memmove(CMSG_DATA(cmsg), &fd, sizeof(int)); | |
msg.msg_controllen = cmsg->cmsg_len; | |
if(sendmsg(migrator, &msg, 0) != iov.iov_len) { die("ERROR in sendmsg"); } | |
} | |
int migrate_in(connections_t *connections) { | |
fd_set migrate_fds; | |
char something[] = "$"; | |
int migrator = create_migrator_client(); | |
int listener, newfd, i; | |
// Write something (anything), to kick off the migration process | |
if(write(migrator, &something, 1) < 0) { die("ERROR failed to write to migrator"); } | |
listener = receive_fd(migrator); | |
if(listener < 0) { die("ERROR couldn't migrate listener"); } | |
for(i = 0; i < MAX_CONNECTIONS; i++) { | |
newfd = receive_fd(migrator); | |
if(newfd < 0) { break; } | |
add_connection(connections, newfd); | |
} | |
return listener; | |
} | |
void migrate_out(int migrator, int listener, connections_t *connections) { | |
int migrator_conn = accept_connection(migrator); | |
int i, fd; | |
send_fd(migrator_conn, listener); | |
for(i = 0; i < connections->num; i++) { | |
fd = connections->fds[i]; | |
send_fd(migrator_conn, fd); | |
} | |
} | |
void handle_connections(int listener, int migrator, connections_t *connections) { | |
fd_set fds; | |
fd_set migrate_fds; | |
int fd_max = set_fds_for_select(listener, migrator, connections, &fds); | |
int ready = select(fd_max + 1, &fds, NULL, NULL, NULL); | |
int migrator_conn, i, fd; | |
if(ready < 0) { die("ERROR on select"); } | |
if(ready == 0) { return; } | |
if(FD_ISSET(migrator, &fds)) { | |
migrate_out(migrator, listener, connections); | |
exit(0); | |
} | |
if(FD_ISSET(listener, &fds)) { | |
add_connection(connections, accept_connection(listener)); | |
} | |
handle_echo(connections, &fds); | |
} | |
int main(int argc, char **argv) { | |
connections_t connections; | |
int listener, migrator; | |
memset(&connections, 0, sizeof(connections)); | |
listener = init_or_migrate(argc, argv, &connections); | |
migrator = create_migrator_server(); | |
while(1) { | |
handle_connections(listener, migrator, &connections); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is an echo server that supports migrating its connections to a new instance of itself transparently to any clients (both new clients and those already connected). To see it in action:
In another window:
In your original window you'll see:
Open a third session:
The server running in your first session will exit. Go to your telnet session:
In the session running your new server, you'll see:
$ ./echod migrate server received 28 bytes: Hello again from a client!
Make sure you can still make new connections:
Now in your session running the new server, you'll see:
Ta-da!