Created
November 29, 2013 00:44
-
-
Save mboeh/7700041 to your computer and use it in GitHub Desktop.
C version of ZeroMQ experiment.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <czmq.h> | |
#include <bstrlib.h> | |
const char* shouter_addr = "inproc://shouter"; | |
const char* listener_addr = "inproc://listener"; | |
const char* ui_addr = "inproc://ui"; | |
// Simulation Thread | |
typedef struct simulation_arg_t | |
{ | |
// Input parameters | |
bstring label; | |
// Internal context | |
zctx_t* context; | |
void* shoutsock; | |
void* listsock; | |
void* uisock; | |
bstring lastmsg; | |
} simulation_arg_t; | |
void teardown_simulation_args(simulation_arg_t **args_p) | |
{ | |
simulation_arg_t *args = *args_p; | |
bdestroy(args->label); | |
bdestroy(args->lastmsg); | |
if (args->shoutsock) zsocket_destroy(args->context, args->shoutsock); | |
if (args->listsock) zsocket_destroy(args->context, args->listsock); | |
if (args->uisock) zsocket_destroy(args->context, args->uisock); | |
free(*args_p); | |
*args_p = NULL; | |
} | |
void simulation_th(void *args_v, zctx_t *context, void *pipe) | |
{ | |
simulation_arg_t *args = args_v; | |
args->context = context; | |
// Open connections to other threads | |
args->shoutsock = zsocket_new(args->context, ZMQ_PAIR); | |
zsocket_bind(args->shoutsock, shouter_addr); | |
args->listsock = zsocket_new(args->context, ZMQ_PAIR); | |
zsocket_bind(args->listsock, listener_addr); | |
args->uisock = zsocket_new(args->context, ZMQ_PAIR); | |
zsocket_connect(args->uisock, ui_addr); | |
// Set up poller | |
zpoller_t* poller = zpoller_new(args->uisock, args->listsock, NULL); | |
int ticks = 0; | |
args->lastmsg = bfromcstr("nothing"); | |
while (1) { | |
void* sock = zpoller_wait(poller, 1000/60); | |
if (sock == args->uisock) { | |
char* msg = zstr_recv(sock); | |
bstring outmsg = bformat("%s: %d: %s? %s!", | |
args->label, | |
ticks, | |
args->lastmsg->data, | |
msg); | |
zstr_send(args->shoutsock, outmsg->data); | |
bdestroy(outmsg); | |
free(msg); | |
} else if (sock == args->listsock) { | |
char* msg = zstr_recv(sock); | |
bassigncstr(args->lastmsg, msg); | |
free(msg); | |
} | |
ticks += 1; | |
} | |
zpoller_destroy(&poller); | |
teardown_simulation_args(&args); | |
zstr_send(pipe, "done"); | |
} | |
void* start_simulation(zctx_t* context, const char* label) | |
{ | |
simulation_arg_t *args = calloc(1, sizeof(simulation_arg_t)); | |
if (args == NULL) return NULL; | |
args->label = bfromcstr(label); | |
if (args->label == NULL) return NULL; | |
return zthread_fork(context, simulation_th, args); | |
} | |
// Listener Thread | |
typedef struct listener_arg_t | |
{ | |
// Input parameters | |
bstring port; | |
// Internal context | |
zctx_t* context; | |
void* recvsock; | |
void* simsock; | |
} listener_arg_t; | |
void teardown_listener_args(listener_arg_t **args_p) | |
{ | |
listener_arg_t *args = *args_p; | |
bdestroy(args->port); | |
if (args->simsock) zsocket_destroy(args->context, args->simsock); | |
if (args->recvsock) zsocket_destroy(args->context, args->recvsock); | |
free(*args_p); | |
*args_p = NULL; | |
} | |
void listener_th(void *args_v, zctx_t *context, void *pipe) | |
{ | |
listener_arg_t *args = args_v; | |
args->context = context; | |
// Open connections to other threads | |
bstring recvaddr = bformat("tcp://localhost:%s", args->port->data); | |
args->recvsock = zsocket_new(args->context, ZMQ_SUB); | |
zsocket_connect(args->recvsock, recvaddr->data); | |
zmq_setsockopt(args->recvsock, ZMQ_SUBSCRIBE, "", 0); | |
args->simsock = zsocket_new(args->context, ZMQ_PAIR); | |
zsocket_connect(args->simsock, listener_addr); | |
while (1) { | |
char* msg = zstr_recv(args->recvsock); | |
if(msg == NULL) break; | |
printf("HEARING >> %s\n", msg); | |
// Split it by ?... | |
bstring msgstr = bfromcstr(msg); | |
struct bstrList* strlist = bsplit(msgstr, '?'); | |
// Save the last bit after ?... | |
bassign(msgstr, strlist->entry[strlist->qty - 1]); | |
// Chop off the final ! | |
btrunc(msgstr, blength(msgstr) - 1); | |
// Send it on, finally | |
zstr_send(args->simsock, msgstr->data); | |
bstrListDestroy(strlist); | |
bdestroy(msgstr); | |
free(msg); | |
} | |
teardown_listener_args(&args); | |
zstr_send(pipe, "done"); | |
} | |
void* start_listener(zctx_t* context, const char* port) | |
{ | |
listener_arg_t *args = calloc(1, sizeof(listener_arg_t)); | |
if (args == NULL) return NULL; | |
args->port = bfromcstr(port); | |
if (args->port == NULL) return NULL; | |
return zthread_fork(context, listener_th, args); | |
} | |
// Shouter Thread | |
typedef struct shouter_arg_t | |
{ | |
// Input parameters | |
bstring port; | |
// Internal context | |
zctx_t* context; | |
void* sendsock; | |
void* simsock; | |
} shouter_arg_t; | |
void teardown_shouter_args(shouter_arg_t **args_p) | |
{ | |
shouter_arg_t *args = *args_p; | |
bdestroy(args->port); | |
if (args->simsock) zsocket_destroy(args->context, args->simsock); | |
if (args->sendsock) zsocket_destroy(args->context, args->sendsock); | |
free(*args_p); | |
*args_p = NULL; | |
} | |
void shouter_th(void *args_v, zctx_t *context, void *pipe) | |
{ | |
shouter_arg_t *args = args_v; | |
args->context = context; | |
// Open connections to other threads | |
bstring sendaddr = bformat("tcp://*:%s", args->port->data); | |
args->sendsock = zsocket_new(args->context, ZMQ_PUB); | |
zsocket_bind(args->sendsock, sendaddr->data); | |
args->simsock = zsocket_new(args->context, ZMQ_PAIR); | |
int ok = zsocket_connect(args->simsock, shouter_addr); | |
while (1) { | |
char* str = zstr_recv(args->simsock); | |
zstr_send(args->sendsock, str); | |
free(str); | |
} | |
teardown_shouter_args(&args); | |
zstr_send(pipe, "done"); | |
} | |
void* start_shouter(zctx_t* context, const char* port) | |
{ | |
shouter_arg_t *args = calloc(1, sizeof(shouter_arg_t)); | |
if (args == NULL) return NULL; | |
args->port = bfromcstr(port); | |
if (args->port == NULL) return NULL; | |
return zthread_fork(context, shouter_th, args); | |
} | |
int main(int argc, char *argv []) | |
{ | |
zctx_t *context = zctx_new(); | |
bstring inport_s = bfromcstr(argv[1]); | |
bstring outport_s = bfromcstr(argv[2]); | |
void* inputsock = zsocket_new(context, ZMQ_PAIR); | |
zsocket_bind(inputsock, ui_addr); | |
void* simpipe = start_simulation(context, "a thread"); | |
void* listpipe = start_listener(context, inport_s->data); | |
void* shoutpipe = start_shouter(context, outport_s->data); | |
while (1) { | |
bstring input = bgets((bNgetc) fgetc, stdin, '\n'); | |
btrimws(input); | |
zstr_send(inputsock, input->data); | |
bdestroy(input); | |
} | |
zctx_destroy(&context); | |
bdestroy(inport_s); | |
bdestroy(outport_s); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment