Skip to content

Instantly share code, notes, and snippets.

Last active November 16, 2017 04:44
Show Gist options
  • Save anonymous/6fb336d4cc7e5cf76eed to your computer and use it in GitHub Desktop.
Save anonymous/6fb336d4cc7e5cf76eed to your computer and use it in GitHub Desktop.
Proxying encrypted CURVE traffic using ZeroMQ
// Written Alan Ward -- copied from larger program, not fully tested
// Placed into the public domain
char worker_endpoint [128], worker_endpoints [1024];
int process_instance;
worker_endpoints [0] = 0;
// start async worker processes
for ( process_instance = 0; process_instance < CFG_Backend_Threads; process_instance++ ) {
pid_t pid = fork();
if ( pid == 0 ) { /* now in the forked process */
server_worker ( process_instance );
_exit (0);
}
else
// build string for worker IPC connections
sprintf ( worker_endpoint, "ipc:///tmp/async%d.ipc", process_instance );
strcat ( worker_endpoints, worker_endpoint );
if ( process_instance < CFG_Backend_Threads - 1 ) strcat ( worker_endpoints, "," );
}
// Connect frontend socket that talks to clients
zsock_t *frontend = zsock_new ( ZMQ_ROUTER );
assert ( frontend );
// set up 2-way curve security
secure_connection ( frontend );
int rc = zsock_bind (frontend, "tcp://*:%s", CFG_Server_Async_Port);
assert ( rc != -1 );
// connect socket for async backend workers
zsock_t *backend = zsock_new ( ZMQ_DEALER );
assert ( backend );
rc = zsock_attach ( backend, worker_endpoints, 1 );
assert ( rc != -1 );
// Set up poller to handle 2-way traffic
zpoller_t *poller = zpoller_new ( frontend,
backend,
NULL );
assert (poller);
while (1) {
void *which = zpoller_wait (poller, -1 );
if ( which == frontend ) {
// get incoming client message
zmsg_t *msg = zmsg_recv ( frontend );
assert (msg);
// test for STOP command
zframe_t *content = zmsg_last (msg);
assert (content);
char *message = zframe_data (content);
if (streq (message, "STOP") ) {
// send STOP message to each worker, before exiting
for ( process_instance = 0; process_instance < CFG_Backend_Threads; process_instance++ ) {
// need to copy message as it is destroyed by send
zmsg_t *msg_copy = zmsg_dup ( msg );
rc = zmsg_send ( &msg_copy, backend );
}
zmsg_destroy ( &msg );
// wait for workers to get message before destroying socket
sleep (1);
zsock_destroy ( &frontend );
zsock_destroy ( &backend );
return;
}
// not STOP, send message to worker process
rc = zmsg_send ( &msg, backend );
assert ( rc != -1 );
}
else if ( which == backend ) {
// get reply mesage from backend process and send to client
zmsg_t *msg = zmsg_recv ( backend );
assert (msg);
rc = zmsg_send ( &msg, frontend );
assert ( rc != -1 );
}
else if ( zpoller_terminated ) break;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment