Last active
August 29, 2015 14:05
-
-
Save darach/7c4d074edac22dd9d25c to your computer and use it in GitHub Desktop.
0MQ Slow Joiner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CC = gcc | |
LD = | |
AR = ar | |
OBJS = zpub.o zsub.o | |
LIBS = -lzmq -lczmq -lwjelement -lwjreader #-lpthreads | |
CFLAGS = -Ideps/libzmq/include -Ideps/czmq/include -Ideps/wjelement/include -O2 -funroll-loops -g -std=c99 -pedantic -Wall -fPIC | |
LDFLAGS = -Ldeps/libzmq/src/.libs -Ldeps/czmq/src/.libs -Ldeps/wjelement/src/wjelement -Ldeps/wjelement/src/wjreader/ | |
ARFLAGS = | |
zpub.o: zpub.c | |
$(CC) $(CFLAGS) -c zpub.c | |
zsub.o: zsub.c | |
$(CC) $(CFLAGS) -c zsub.c | |
clean : | |
cd deps/wjelement \ | |
make clean \ | |
cd ../.. | |
/bin/rm -f *.o | |
/bin/rm -f zpub | |
/bin/rm -f zsub | |
purge: clean | |
/bin/rm -rf deps | |
get-deps: | |
@if [ ! -d deps ]; then \ | |
mkdir deps ; \ | |
echo "Dependencies missing. Charging." ; \ | |
fi ; | |
@if [ -e deps/wjelement ]; then \ | |
echo "Dependency 'deps/wjelement' already bound." ; \ | |
else \ | |
echo Git cloning deps/wjelement ; \ | |
git clone https://github.com/netmail-open/wjelement deps/wjelement ; \ | |
echo Building deps/wjelement ; \ | |
cd deps/wjelement ; \ | |
cmake -Wno-dev . ; \ | |
make ; \ | |
fi ; | |
@if [ -e deps/libzmq ]; then \ | |
echo "Dependency 'deps/libzmq' already bound." ; \ | |
else \ | |
echo Git cloning deps/libzmq; \ | |
git clone git://github.com/zeromq/libzmq deps/libzmq ; \ | |
echo Building deps/libzmq; \ | |
cd deps/libzmq; \ | |
./autogen.sh ; \ | |
./configure ; \ | |
make ; \ | |
fi | |
@if [ -e deps/czmq ]; then \ | |
echo "Dependency 'deps/czmq' already bound." ; \ | |
else \ | |
echo Git cloning deps/czmq; \ | |
git clone git://github.com/zeromq/czmq deps/czmq ; \ | |
echo Building deps/czmq; \ | |
cd deps/czmq; \ | |
./autogen.sh ; \ | |
./configure ; \ | |
make ; \ | |
fi | |
zsub: get-deps zsub.o | |
@gcc $(LDFLAGS) $(LIBS) -o zsub zsub.o | |
zpub: get-deps zpub.o | |
@gcc $(LDFLAGS) $(LIBS) -o zpub zpub.o | |
all: zsub zpub | |
.PHONY: all |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <zmq.h> | |
int main (int argc, char **argv) | |
{ | |
if (argc != 5) | |
{ | |
fprintf(stderr, "Usage:\n\n %s <zeromq-pub-url> <channel> <delay-us> <count>", argv[0]); | |
fprintf(stderr, "\n\n example: $ %s tcp://*:4001 MyChannel 10000\n", argv[0]); | |
fflush(stderr); | |
return 1; | |
} | |
const char *zurl = argv[1]; | |
const char *channel_name = argv[2]; | |
const int delay_us = atoi(argv[3]); | |
unsigned long int count = strtoul(argv[4], NULL, 10); | |
printf("zurl: %s\n", zurl); | |
printf("channel: %s\n", channel_name); | |
printf("delay_us: %i\n", delay_us); | |
printf("count: %lu\n", count); | |
// Prepare our context and publisher | |
void *context = zmq_ctx_new (); | |
void *publisher = zmq_socket (context, ZMQ_PUB); | |
zmq_bind (publisher, zurl); | |
printf("Listening on %s\n", zurl); | |
unsigned long int current = 0; | |
unsigned long int tx = 0; | |
do { | |
int size = zmq_send(publisher, channel_name, strlen(channel_name), ZMQ_SNDMORE); | |
char buf[256]; | |
sprintf(buf,"%lu bottles of beer on the wall",current++); | |
size += zmq_send(publisher, buf, strlen(buf), 0); | |
usleep(delay_us); | |
tx += size; | |
} while(--count > 0); | |
printf("Total bytes sent: %lu", tx); | |
zmq_close (publisher); | |
zmq_ctx_destroy (context); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <limits.h> | |
#include <stdlib.h> | |
#include <zmq.h> | |
const int rcvhwm = 0; | |
int main(int argc, char **argv) | |
{ | |
if (argc != 3) | |
{ | |
fprintf(stderr, "Usage:\n\n %s <zeromq-pub-url> <channel>", argv[0]); | |
fprintf(stderr, "\n\n example: $ %s tcp://localhost:4001 MyChannel\n", argv[0]); | |
fflush(stderr); | |
return 1; | |
} | |
printf("zurl: %s\n", zurl); | |
printf("channel: %s\n", channel_name); | |
const char* zurl = argv[1]; | |
const char* channel_name = argv[2]; | |
unsigned long int min = ULONG_MAX; | |
printf("Connecting to %s\n", zurl); | |
void *context = zmq_ctx_new (); | |
void *subscriber = zmq_socket (context, ZMQ_SUB); | |
zmq_setsockopt (subscriber, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)); | |
zmq_connect (subscriber, zurl); | |
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, &channel_name, 0); | |
while (1) { | |
char channel[256]; | |
int size = zmq_recv(subscriber, channel, 255, 0); | |
if (size == -1) | |
{ | |
break; | |
} | |
if (size > 255) | |
{ | |
size = 255; | |
} | |
channel[size] = 0; | |
char data[1024]; | |
size = zmq_recv(subscriber, data, 1023, 0); | |
if (size == -1) | |
{ | |
break; | |
} | |
if (size > 1023) | |
{ | |
size = 1023; | |
} | |
data[size] = 0; | |
unsigned long int as_num = strtoul(data, NULL, 10); | |
if (as_num < min) | |
{ | |
min = as_num; | |
} | |
printf ("[%s] %s (%lu)\n", channel, data, min); | |
} | |
// ZeroMQ context was terminated (for some unknown reason) | |
// | |
zmq_close (subscriber); | |
zmq_ctx_destroy (context); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample usage once the 0mq peers are built:
Run a number of subscribers:
$ DYLD_LIBRARY_PATH=deps/libzmq/src/.libs:deps/czmq/src/.libs ./zsub "tcp://localhost:5001" Channel
Run a publisher:
$ DYLD_LIBRARY_PATH=deps/libzmq/src/.libs:deps/czmq/src/.libs ./zpub "tcp://*:5001" Channel 1000 1000000
Related content from the ZeroMQ guide: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver
The guide suggests:
Observation: Messages may still be lost. The higher the messaging frequency the more messages (rate relative) will be lost as with the previous case
Observation: Introduces a sync protocol via REQ-REP pattern. It may be better to use messaging with guaranteed delivery instead. The example, for example, requires a priori knowledge of number of subscribers...