Last active
August 29, 2015 14:15
-
-
Save BastienDurel/92593ea3121c293148d9 to your computer and use it in GitHub Desktop.
push/pull test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
run server on machine A, run client on machine B | |
then run on machine A | |
iptables -I INPUT -p tcp -m tcp --dport 5555 -j DROP ; sleep 2 ; iptables -D INPUT -p tcp -m tcp --dport 5555 -j DROP | |
before client sends all messages | |
I've included pcap traces of a run test. I put a 120ms delay between each message, reduced message count, and activated a fast TCP keepalive (idle = 1, cnt = 1, intvl = 1) | |
In the "bad" trace I put the DROP ruel a approx. t+2.3 | |
I got this in server : | |
error: id is 36 and last id was 19 | |
got end, count = 85 | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <memory> | |
#include <stdexcept> | |
#include <string> | |
#include <iostream> | |
#include <cstring> | |
#include <zmq.h> | |
#include "common.h" | |
using std::string; | |
using std::unique_ptr; | |
using std::runtime_error; | |
using std::cout; | |
using std::endl; | |
void * ctx = 0; | |
void send(void*, int); | |
int main(int c, const char**v) { | |
int rc; | |
ctx = zmq_ctx_new(); | |
auto sock = zmq_socket(ctx, ZMQ_PUSH); | |
if (!sock) | |
ERR("zmq_socket"); | |
cout << 1 << endl; | |
const char * endpoint = "tcp://127.0.0.1:5555"; | |
if (c > 1) | |
endpoint = v[1]; | |
const int wm = WM_VALUE; | |
rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &wm, sizeof(wm)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
set_keepalive_options(sock); | |
rc = zmq_connect(sock, endpoint); | |
if (rc != 0) | |
ERR("zmq_connect"); | |
cout << 2 << endl; | |
constexpr int max_msg = 10000000; | |
for (int i = 0; i < max_msg; ++i) { | |
send(sock, i); | |
} | |
send(sock, -1); | |
cout << 3 << endl; | |
rc = zmq_disconnect(sock, endpoint); | |
if (rc != 0) | |
ERR("zmq_disconnect"); | |
cout << 4 << endl; | |
rc = zmq_close(sock); | |
if (rc != 0) | |
ERR("zmq_close"); | |
cout << 5 << endl; | |
rc = zmq_ctx_term(ctx); | |
if (rc != 0) | |
ERR("zmq_ctx_term"); | |
cout << 6 << endl; | |
return 0; | |
} | |
struct msg { | |
zmq_msg_t _m; | |
msg() { int rc = zmq_msg_init (&_m); if (rc != 0) ERR("zmq_msg_init"); } | |
explicit msg(int size) { int rc = zmq_msg_init_size (&_m, size); if (rc != 0) ERR("zmq_msg_init_size"); } | |
msg(const msg&) = delete; | |
~msg() { int rc = zmq_msg_close (&_m); if (rc != 0) ERR("zmq_msg_close"); } | |
operator zmq_msg_t*() { return &_m; } | |
}; | |
void send(void* sock, int i) { | |
int rc = 0; | |
string m{"hello #"}; | |
m.append(std::to_string(i)); | |
if (i == -1) | |
m = "end"; | |
msg message{(int)m.size()}; | |
memmove(zmq_msg_data(message), m.c_str(), m.size()); | |
const int size = zmq_msg_size(message); | |
rc = zmq_msg_send(message, sock, 0); | |
if (rc != size) | |
throw runtime_error("zmq_msg_send"); | |
//cout << '.' << i << endl; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// -*- mode: c+ -*- | |
#if !defined TEST_PUSHPULL_COMMON_INCLUDED | |
#define TEST_PUSHPULL_COMMON_INCLUDED 1 | |
#include <zmq.h> | |
#define WM_VALUE 100000000 | |
inline void ERR(std::string&& err) throw(std::exception) { | |
err.append(": ").append(strerror(errno)); | |
throw std::runtime_error(err.c_str()); | |
} | |
inline void set_keepalive_options(void* sock) { | |
#if defined DOKEEPALIVE && DOKEEPALIVE > 0 | |
int rc; | |
const int keepalive = 1; // use TCP SO_KEEPALIVE | |
const int ka_idle = 1; // Start keeplives after this period | |
const int ka_cnt = 1; // Interval between keepalives | |
const int ka_int = 1; // Number of keepalives before death | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE, &keepalive, sizeof(keepalive)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_IDLE, &ka_idle, sizeof(ka_idle)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_CNT, &ka_cnt, sizeof(ka_cnt)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_INTVL, &ka_int, sizeof(ka_int)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
#endif | |
} | |
#endif// ~TEST_PUSHPULL_COMMON_INCLUDED |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SRCC=client.cpp | |
SRCS=server.cpp | |
OBJC=$(SRCC:.cpp=.o) | |
OBJS=$(SRCS:.cpp=.o) | |
ifneq ($(NOKEEPALIVE),1) | |
KA=-DDOKEEPALIVE=1 | |
endif | |
OPT=-g -O2 | |
CXXFLAGS=$(shell pkg-config --cflags libzmq) --std=c++11 $(OPT) $(KA) | |
LDFLAGS=$(shell pkg-config --libs libzmq) $(OPT) | |
all: client server | |
test-client: client | |
bash -c "time ./client" | |
test-server: server | |
./server | |
client: $(OBJC) | |
$(CXX) -o $@ $(OBJC) $(LDFLAGS) | |
server: $(OBJS) | |
$(CXX) -o $@ $(OBJS) $(LDFLAGS) | |
$(OBJC): common.h | |
$(OBJS): common.h | |
clean: | |
rm -f $(OBJS) $(OBJC) *~ | |
fclean: clean | |
rm -f client server |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <memory> | |
#include <stdexcept> | |
#include <string> | |
#include <cstring> | |
#include <cstdlib> | |
#include <thread> | |
#include <iostream> | |
#include <zmq.h> | |
#include "common.h" | |
using std::string; | |
using std::unique_ptr; | |
using std::runtime_error; | |
using std::cout; | |
using std::endl; | |
void * ctx = 0; | |
bool loop = true; | |
#if __cplusplus < 201103L | |
#define constexpr const | |
#endif | |
void zrecv(void*); | |
int main() { | |
int rc; | |
ctx = zmq_ctx_new(); | |
auto sock = zmq_socket(ctx, ZMQ_PULL); | |
if (!sock) | |
ERR("zmq_socket"); | |
const int wm = WM_VALUE; | |
rc = zmq_setsockopt(sock, ZMQ_RCVHWM, &wm, sizeof(wm)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
set_keepalive_options(sock); | |
rc = zmq_bind(sock, "tcp://*:5555"); | |
if (rc != 0) | |
ERR("zmq_bind"); | |
while (loop) { | |
try { | |
zrecv(sock); | |
} catch (const runtime_error& ex) { | |
cout << ex.what() << endl; | |
return 1; | |
} | |
} | |
rc = zmq_close(sock); | |
if (rc != 0) | |
ERR("zmq_close"); | |
rc = zmq_ctx_term(ctx); | |
if (rc != 0) | |
ERR("zmq_ctx_term"); | |
return 0; | |
} | |
void zrecv(void* sock) { | |
static size_t count = 0; | |
static long last_id = -1; | |
zmq_msg_t message; | |
int rc = zmq_msg_init(&message); | |
if (rc != 0) | |
ERR("zmq_msg_init"); | |
//constexpr int flags = ZMQ_DONTWAIT; | |
constexpr int flags = 0; | |
rc = zmq_msg_recv(&message, sock, flags); | |
if (rc < 0) { | |
if (errno != EAGAIN) | |
ERR("zmq_msg_recv"); | |
std::this_thread::yield(); | |
return; | |
} | |
// TODO: handling | |
//cout << "got " << rc << " bytes" << endl; | |
string m{static_cast<const char*>(zmq_msg_data(&message)), | |
static_cast<string::size_type>(rc)}; | |
if (m == "end") { | |
last_id = -1; | |
cout << "got end, count = " << count << endl; | |
//loop = false; | |
} | |
else { | |
if (m.compare(0, 7, "hello #") == 0) { | |
long id = std::strtol(m.c_str() + 7, nullptr, 10); | |
if (id > last_id + 1 && last_id != -1) | |
{ | |
cout << "error: id is " << id << " and last id was " << last_id << endl; | |
} | |
last_id = id; | |
} else { | |
string err{"bad message: "}; | |
err.append(m); | |
throw runtime_error(err.c_str()); | |
} | |
} | |
++count; | |
if ((count & 0x03ffff) == 0) | |
cout << count << endl; | |
rc = zmq_msg_close (&message); | |
if (rc != 0) | |
ERR("zmq_msg_close"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've made tests with server under Arch (zeromq 4.0.5/gcc 4.9.2) and client under Arch, Debian stable (zeromq 3.2.3+dfsg-2~bpo70+1/gcc 4.7.2-5), Debian sid (zeromq 4.0.5+dfsg-2/gcc 4.9.2), Windows (zeromq 4.0.4 / msvc12 or netmq)