Created
February 24, 2011 16:37
-
-
Save hintjens/842407 to your computer and use it in GitHub Desktop.
Leaker does not leak
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // | |
| // Leak example using PUB/SUB pattern | |
| // | |
| // While this example runs in a single process, that is just to make | |
| // it easier to start and stop the example. Each task has its own | |
| // context and conceptually acts as a separate process. | |
| #include "zhelpers.h" | |
| #include "zmsg.c" | |
| // --------------------------------------------------------------------- | |
| // This is our client task | |
| // Message logic is shamelessly stolen from the +wuclient+ example. | |
| // Subscriber opens a socket, sets its options, and then receives as | |
| // many messages as it can (and throws them away) in 100ms. Then it | |
| // closes the socket, loops back up and starts all over again. | |
| static void * | |
| client_task (void *context) { | |
| struct timeval tstart, tend, tdiff; | |
| while (1) { | |
| int total_msec = 0; | |
| void *subscriber = zmq_socket (context, ZMQ_SUB); | |
| uint64_t hwm = 1; | |
| zmq_setsockopt (subscriber, ZMQ_HWM, &hwm, sizeof (hwm)); | |
| int linger = 0; | |
| zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger)); | |
| zmq_connect (subscriber, "inproc://publisher"); | |
| zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0); | |
| gettimeofday (&tstart, NULL); | |
| gettimeofday (&tend, NULL); | |
| if (tend.tv_usec < tstart.tv_usec) { | |
| tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; | |
| tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec; | |
| } | |
| else { | |
| tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; | |
| tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; | |
| } | |
| total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; | |
| int update_nbr; | |
| long total_temp = 0; | |
| while (total_msec < 100) { | |
| char *string = s_recv (subscriber); | |
| int zipcode, temperature, relhumidity; | |
| sscanf (string, "%d %d %d", | |
| &zipcode, &temperature, &relhumidity); | |
| total_temp += temperature; | |
| free (string); | |
| gettimeofday (&tend, NULL); | |
| if (tend.tv_usec < tstart.tv_usec) { | |
| tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; | |
| tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec; | |
| } | |
| else { | |
| tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; | |
| tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; | |
| } | |
| total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; | |
| } | |
| // Clean up and end task properly | |
| zmq_close (subscriber); | |
| } | |
| return (NULL); | |
| } | |
| // --------------------------------------------------------------------- | |
| // Publisher | |
| // Shamelessly stolen from the +wuserver+ example | |
| static void *server_worker (void *socket); | |
| void *server_task (void *context) { | |
| void *publisher = zmq_socket (context, ZMQ_PUB); | |
| uint64_t hwm = 1; | |
| zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm)); | |
| zmq_bind (publisher, "inproc://publisher"); | |
| // Publish 1 message per 25ms | |
| while (1) { | |
| // Get values that will fool the boss | |
| int zipcode, temperature, relhumidity; | |
| zipcode = randof (100000); | |
| temperature = randof (215) - 80; | |
| relhumidity = randof (50) + 10; | |
| // Send message to all subscribers | |
| char update [20]; | |
| sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity); | |
| s_send (publisher, update); | |
| // sleep for 25ms so we don't overrun and/or queue on the clients | |
| struct timespec t = { 0, 2500000 }; | |
| nanosleep (&t, NULL); | |
| } | |
| return (NULL); | |
| } | |
| // This main thread simply starts several clients, and a server, and then | |
| // waits for the server to finish. | |
| // | |
| int main () { | |
| s_version_assert (2, 1); | |
| void *context = zmq_init (1); | |
| pthread_t server_thread; | |
| pthread_create (&server_thread, NULL, server_task, context); | |
| sleep (1); | |
| pthread_t client_thread; | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_create (&client_thread, NULL, client_task, context); | |
| pthread_join (server_thread, context); | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment