Skip to content

Instantly share code, notes, and snippets.

@hintjens
Created February 24, 2011 16:37
Show Gist options
  • Save hintjens/842407 to your computer and use it in GitHub Desktop.
Save hintjens/842407 to your computer and use it in GitHub Desktop.
Leaker does not leak
//
// Leak example using PUB/SUB pattern
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each task has its own
// context and conceptually acts as a separate process.
#include "zhelpers.h"
#include "zmsg.c"
// ---------------------------------------------------------------------
// This is our client task
// Message logic is shamelessly stolen from the +wuclient+ example.
// Subscriber opens a socket, sets its options, and then receives as
// many messages as it can (and throws them away) in 100ms. Then it
// closes the socket, loops back up and starts all over again.
static void *
client_task (void *context) {
struct timeval tstart, tend, tdiff;
while (1) {
int total_msec = 0;
void *subscriber = zmq_socket (context, ZMQ_SUB);
uint64_t hwm = 1;
zmq_setsockopt (subscriber, ZMQ_HWM, &hwm, sizeof (hwm));
int linger = 0;
zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
zmq_connect (subscriber, "inproc://publisher");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
gettimeofday (&tstart, NULL);
gettimeofday (&tend, NULL);
if (tend.tv_usec < tstart.tv_usec) {
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
}
else {
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
}
total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
int update_nbr;
long total_temp = 0;
while (total_msec < 100) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
gettimeofday (&tend, NULL);
if (tend.tv_usec < tstart.tv_usec) {
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
}
else {
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
}
total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
}
// Clean up and end task properly
zmq_close (subscriber);
}
return (NULL);
}
// ---------------------------------------------------------------------
// Publisher
// Shamelessly stolen from the +wuserver+ example
static void *server_worker (void *socket);
void *server_task (void *context) {
void *publisher = zmq_socket (context, ZMQ_PUB);
uint64_t hwm = 1;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
zmq_bind (publisher, "inproc://publisher");
// Publish 1 message per 25ms
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
// sleep for 25ms so we don't overrun and/or queue on the clients
struct timespec t = { 0, 2500000 };
nanosleep (&t, NULL);
}
return (NULL);
}
// This main thread simply starts several clients, and a server, and then
// waits for the server to finish.
//
int main () {
s_version_assert (2, 1);
void *context = zmq_init (1);
pthread_t server_thread;
pthread_create (&server_thread, NULL, server_task, context);
sleep (1);
pthread_t client_thread;
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_create (&client_thread, NULL, client_task, context);
pthread_join (server_thread, context);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment