Created
June 28, 2015 16:38
-
-
Save jbreams/64fad28478dd63e84adc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/Makefile.am b/Makefile.am | |
index f375437..b219601 100644 | |
--- a/Makefile.am | |
+++ b/Makefile.am | |
@@ -329,7 +329,8 @@ test_apps = \ | |
test_bind_src_address \ | |
test_metadata \ | |
test_capabilities \ | |
- test_xpub_nodrop | |
+ test_xpub_nodrop \ | |
+ test_heartbeats | |
test_system_SOURCES = tests/test_system.cpp | |
test_system_LDADD = libzmq.la | |
@@ -498,6 +499,9 @@ test_capabilities_LDADD = libzmq.la | |
test_xpub_nodrop_SOURCES = tests/test_xpub_nodrop.cpp | |
test_xpub_nodrop_LDADD = libzmq.la | |
+test_heartbeats_SOURCES = tests/test_heartbeats.cpp | |
+test_heartbeats_LDADD = libzmq.la | |
+ | |
if !ON_MINGW | |
test_apps += \ | |
test_shutdown_stress \ | |
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt | |
index 0025ef5..0b5cc87 100644 | |
--- a/doc/zmq_setsockopt.txt | |
+++ b/doc/zmq_setsockopt.txt | |
@@ -246,6 +246,47 @@ Option value unit:: milliseconds | |
Default value:: 30000 | |
Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented transports | |
+ZMQ_HEARTBEAT_IVL: Set interval between sending ZMTP heartbeats | |
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
+The 'ZMQ_HEARTBEAT_IVL' option shall set the interval between sending ZMTP heartbeats | |
+for the specified 'socket'. If this option is set and is greater than 0, then a 'PING' | |
+ZMTP command will be sent every 'ZMQ_HEARTBEAT_IVL' milliseconds. | |
+ | |
+[horizontal] | |
+Option value type:: int | |
+Option value unit:: milliseconds | |
+Default value:: 0 | |
+Applicable socket types:: all, when using connection-oriented transports | |
+ | |
+ZMQ_HEARTBEAT_TIMEOUT: Set timeout for ZMTP heartbeats | |
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
+The 'ZMQ_HEARTBEAT_TIMEOUT' option shall set how long to wait before timing-out a | |
+connection after sending a 'PING' ZMTP command and not receiving any traffic. This | |
+option is only valid if 'ZMQ_HEARTBEAT_IVL' is also set, and is greater than 0. The | |
+connection will time out if there is no traffic received after sending the 'PING' | |
+command, but the received traffic does not have to be a 'PONG' command - any received | |
+traffic will cancel the timeout. | |
+ | |
+[horizontal] | |
+Option value type:: int | |
+Option value unit:: milliseconds | |
+Default value:: 0 | |
+Applicable socket types:: all, when using connection-oriented transports | |
+ | |
+ZMQ_HEARTBEAT_TTL: Set the TTL value for ZMTP heartbeats | |
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
+The 'ZMQ_HEARTBEAT_TTL' option shall set the timeout on the remote peer for ZMTP | |
+heartbeats. If this option is greater than 0, the remote side shall time out the | |
+connection if it does not receive any more traffic within the TTL period. This option | |
+does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. Internally, this | |
+value is rounded down to the nearest decisecond, any value less than 100 will have | |
+no effect. | |
+ | |
+[horizontal] | |
+Option value type:: int | |
+Option value unit:: milliseconds | |
+Default value:: 0 | |
+Applicable socket types:: all, when using connection-oriented transports | |
ZMQ_IDENTITY: Set socket identity | |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
diff --git a/include/zmq.h b/include/zmq.h | |
index e6e9be7..00715c8 100644 | |
--- a/include/zmq.h | |
+++ b/include/zmq.h | |
@@ -294,6 +294,9 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); | |
#define ZMQ_HANDSHAKE_IVL 66 | |
#define ZMQ_SOCKS_PROXY 68 | |
#define ZMQ_XPUB_NODROP 69 | |
+#define ZMQ_HEARTBEAT_IVL 75 | |
+#define ZMQ_HEARTBEAT_TTL 76 | |
+#define ZMQ_HEARTBEAT_TIMEOUT 77 | |
/* Message options */ | |
#define ZMQ_MORE 1 | |
diff --git a/src/curve_client.cpp b/src/curve_client.cpp | |
index 842e504..416227b 100644 | |
--- a/src/curve_client.cpp | |
+++ b/src/curve_client.cpp | |
@@ -129,6 +129,8 @@ int zmq::curve_client_t::encode (msg_t *msg_) | |
uint8_t flags = 0; | |
if (msg_->flags () & msg_t::more) | |
flags |= 0x01; | |
+ if (msg_->flags () & msg_t::command) | |
+ flags |= 0x02; | |
uint8_t message_nonce [crypto_box_NONCEBYTES]; | |
memcpy (message_nonce, "CurveZMQMESSAGEC", 16); | |
@@ -222,6 +224,8 @@ int zmq::curve_client_t::decode (msg_t *msg_) | |
const uint8_t flags = message_plaintext [crypto_box_ZEROBYTES]; | |
if (flags & 0x01) | |
msg_->set_flags (msg_t::more); | |
+ if (flags & 0x02) | |
+ msg_->set_flags (msg_t::command); | |
memcpy (msg_->data (), | |
message_plaintext + crypto_box_ZEROBYTES + 1, | |
diff --git a/src/curve_server.cpp b/src/curve_server.cpp | |
index 7bdd8a9..a7d9ea7 100644 | |
--- a/src/curve_server.cpp | |
+++ b/src/curve_server.cpp | |
@@ -141,6 +141,8 @@ int zmq::curve_server_t::encode (msg_t *msg_) | |
uint8_t flags = 0; | |
if (msg_->flags () & msg_t::more) | |
flags |= 0x01; | |
+ if (msg_->flags () & msg_t::command) | |
+ flags |= 0x02; | |
uint8_t *message_plaintext = static_cast <uint8_t *> (malloc (mlen)); | |
alloc_assert (message_plaintext); | |
@@ -231,6 +233,8 @@ int zmq::curve_server_t::decode (msg_t *msg_) | |
const uint8_t flags = message_plaintext [crypto_box_ZEROBYTES]; | |
if (flags & 0x01) | |
msg_->set_flags (msg_t::more); | |
+ if (flags & 0x02) | |
+ msg_->set_flags (msg_t::command); | |
memcpy (msg_->data (), | |
message_plaintext + crypto_box_ZEROBYTES + 1, | |
diff --git a/src/gssapi_mechanism_base.cpp b/src/gssapi_mechanism_base.cpp | |
index 355f152..bdd1836 100644 | |
--- a/src/gssapi_mechanism_base.cpp | |
+++ b/src/gssapi_mechanism_base.cpp | |
@@ -80,6 +80,8 @@ int zmq::gssapi_mechanism_base_t::encode_message (msg_t *msg_) | |
uint8_t flags = 0; | |
if (msg_->flags () & msg_t::more) | |
flags |= 0x01; | |
+ if (msg ->flags () & msg_t::command) | |
+ flags |= 0x02; | |
uint8_t *plaintext_buffer = static_cast <uint8_t *>(malloc(msg_->size ()+1)); | |
plaintext_buffer[0] = flags; | |
@@ -177,6 +179,8 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_) | |
const uint8_t flags = static_cast <char *> (plaintext.value)[0]; | |
if (flags & 0x01) | |
msg_->set_flags (msg_t::more); | |
+ if (flags & 0x02) | |
+ msg_->set_flags (msg_t::command); | |
memcpy (msg_->data (), static_cast <char *> (plaintext.value)+1, plaintext.length-1); | |
diff --git a/src/options.cpp b/src/options.cpp | |
index ea4f74c..677d2c9 100644 | |
--- a/src/options.cpp | |
+++ b/src/options.cpp | |
@@ -66,7 +66,10 @@ zmq::options_t::options_t () : | |
gss_plaintext (false), | |
socket_id (0), | |
conflate (false), | |
- handshake_ivl (30000) | |
+ handshake_ivl (30000), | |
+ heartbeat_ttl (0), | |
+ heartbeat_interval (0), | |
+ heartbeat_timeout (-1) | |
{ | |
} | |
@@ -503,6 +505,29 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, | |
} | |
break; | |
+ case ZMQ_HEARTBEAT_IVL: | |
+ if (is_int && value >= 0) { | |
+ heartbeat_interval = value; | |
+ return 0; | |
+ } | |
+ break; | |
+ | |
+ case ZMQ_HEARTBEAT_TTL: | |
+ // Convert this to deciseconds from milliseconds | |
+ value = value / 100; | |
+ if (is_int && value >= 0 && value <= 6553) { | |
+ heartbeat_ttl = (uint16_t)value; | |
+ return 0; | |
+ } | |
+ break; | |
+ | |
+ case ZMQ_HEARTBEAT_TIMEOUT: | |
+ if (is_int && value >= 0) { | |
+ heartbeat_timeout = value; | |
+ return 0; | |
+ } | |
+ break; | |
+ | |
default: | |
#if defined (ZMQ_ACT_MILITANT) | |
// There are valid scenarios for probing with unknown socket option | |
@@ -849,6 +874,28 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) | |
} | |
break; | |
+ case ZMQ_HEARTBEAT_IVL: | |
+ if (is_int) { | |
+ *value = heartbeat_interval; | |
+ return 0; | |
+ } | |
+ break; | |
+ | |
+ case ZMQ_HEARTBEAT_TTL: | |
+ if (is_int) { | |
+ // Convert the internal deciseconds value to milliseconds | |
+ *value = heartbeat_ttl * 100; | |
+ return 0; | |
+ } | |
+ break; | |
+ | |
+ case ZMQ_HEARTBEAT_TIMEOUT: | |
+ if (is_int) { | |
+ *value = heartbeat_timeout; | |
+ return 0; | |
+ } | |
+ break; | |
+ | |
default: | |
#if defined (ZMQ_ACT_MILITANT) | |
malformed = false; | |
diff --git a/src/options.hpp b/src/options.hpp | |
index b4a019c..9a68340 100644 | |
--- a/src/options.hpp | |
+++ b/src/options.hpp | |
@@ -189,6 +189,15 @@ namespace zmq | |
// close socket. Default is 30 secs. 0 means no handshake timeout. | |
int handshake_ivl; | |
+ // If remote peer receives a PING message and doesn't receive another | |
+ // message within the ttl value, it should close the connection | |
+ // (measured in tenths of a second) | |
+ uint16_t heartbeat_ttl; | |
+ // Time in milliseconds between sending heartbeat PING messages. | |
+ int heartbeat_interval; | |
+ // Time in milliseconds to wait for a PING response before disconnecting | |
+ int heartbeat_timeout; | |
+ | |
}; | |
} | |
diff --git a/src/session_base.cpp b/src/session_base.cpp | |
index 86bfd8f..30b6706 100644 | |
--- a/src/session_base.cpp | |
+++ b/src/session_base.cpp | |
@@ -135,6 +135,8 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) | |
int zmq::session_base_t::push_msg (msg_t *msg_) | |
{ | |
+ if(msg_->flags() & msg_t::command) | |
+ return 0; | |
if (pipe && pipe->write (msg_)) { | |
int rc = msg_->init (); | |
errno_assert (rc == 0); | |
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp | |
index 87186cc..adae809 100644 | |
--- a/src/stream_engine.cpp | |
+++ b/src/stream_engine.cpp | |
@@ -96,6 +96,10 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, | |
input_stopped (false), | |
output_stopped (false), | |
has_handshake_timer (false), | |
+ has_ttl_timer (false), | |
+ has_timeout_timer (false), | |
+ has_heartbeat_timer (false), | |
+ heartbeat_timeout (0), | |
socket (NULL) | |
{ | |
int rc = tx_msg.init (); | |
@@ -142,6 +146,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, | |
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); | |
errno_assert (rc == 0); | |
#endif | |
+ if(options.heartbeat_interval > 0) { | |
+ heartbeat_timeout = options.heartbeat_timeout; | |
+ if(heartbeat_timeout == -1) | |
+ heartbeat_timeout = options.heartbeat_interval; | |
+ } | |
} | |
zmq::stream_engine_t::~stream_engine_t () | |
@@ -251,6 +260,20 @@ void zmq::stream_engine_t::unplug () | |
has_handshake_timer = false; | |
} | |
+ if (has_ttl_timer) { | |
+ cancel_timer (heartbeat_ttl_timer_id); | |
+ has_ttl_timer = false; | |
+ } | |
+ | |
+ if (has_timeout_timer) { | |
+ cancel_timer (heartbeat_timeout_timer_id); | |
+ has_timeout_timer = false; | |
+ } | |
+ | |
+ if (has_heartbeat_timer) { | |
+ cancel_timer (heartbeat_ivl_timer_id); | |
+ has_heartbeat_timer = false; | |
+ } | |
// Cancel all fd subscriptions. | |
if (!io_error) | |
rm_fd (handle); | |
@@ -684,6 +707,11 @@ bool zmq::stream_engine_t::handshake () | |
} | |
next_msg = &stream_engine_t::next_handshake_command; | |
process_msg = &stream_engine_t::process_handshake_command; | |
+ | |
+ if(options.heartbeat_interval > 0) { | |
+ add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id); | |
+ has_heartbeat_timer = true; | |
+ } | |
} | |
// Start polling for output if necessary. | |
@@ -887,6 +915,23 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_) | |
if (mechanism->decode (msg_) == -1) | |
return -1; | |
+ | |
+ if(has_timeout_timer) { | |
+ has_timeout_timer = false; | |
+ cancel_timer(heartbeat_timeout_timer_id); | |
+ } | |
+ | |
+ if(has_ttl_timer) { | |
+ has_ttl_timer = false; | |
+ cancel_timer(heartbeat_ttl_timer_id); | |
+ } | |
+ | |
+ if(msg_->flags() & msg_t::command) { | |
+ uint8_t cmd_id = *((uint8_t*)msg_->data()); | |
+ if(cmd_id == 4) | |
+ process_heartbeat_message(msg_); | |
+ } | |
+ | |
if (metadata) | |
msg_->set_metadata (metadata); | |
if (session->push_msg (msg_) == -1) { | |
@@ -952,9 +997,86 @@ void zmq::stream_engine_t::set_handshake_timer () | |
void zmq::stream_engine_t::timer_event (int id_) | |
{ | |
- zmq_assert (id_ == handshake_timer_id); | |
- has_handshake_timer = false; | |
+ if(id_ == handshake_timer_id) { | |
+ has_handshake_timer = false; | |
+ // handshake timer expired before handshake completed, so engine fail | |
+ error (timeout_error); | |
+ } | |
+ else if(id_ == heartbeat_ivl_timer_id) { | |
+ next_msg = &stream_engine_t::produce_ping_message; | |
+ out_event(); | |
+ add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id); | |
+ } | |
+ else if(id_ == heartbeat_ttl_timer_id) { | |
+ has_ttl_timer = false; | |
+ error(timeout_error); | |
+ } | |
+ else if(id_ == heartbeat_timeout_timer_id) { | |
+ has_timeout_timer = false; | |
+ error(timeout_error); | |
+ } | |
+ else | |
+ // There are no other valid timer ids! | |
+ assert(false); | |
+} | |
- // handshake timer expired before handshake completed, so engine fails | |
- error (timeout_error); | |
+int zmq::stream_engine_t::produce_ping_message(msg_t * msg_) | |
+{ | |
+ int rc = 0; | |
+ zmq_assert (mechanism != NULL); | |
+ | |
+ // 16-bit TTL + \4PING == 7 | |
+ msg_->init_size(7); | |
+ msg_->set_flags(msg_t::command); | |
+ // Copy in the command message | |
+ memcpy(msg_->data(), "\4PING", 5); | |
+ | |
+ uint16_t ttl_val = htons(options.heartbeat_ttl); | |
+ memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val)); | |
+ | |
+ rc = mechanism->encode (msg_); | |
+ next_msg = &stream_engine_t::pull_and_encode; | |
+ if(!has_timeout_timer && heartbeat_timeout > 0) { | |
+ add_timer(heartbeat_timeout, heartbeat_timeout_timer_id); | |
+ has_timeout_timer = true; | |
+ } | |
+ return rc; | |
+} | |
+ | |
+int zmq::stream_engine_t::produce_pong_message(msg_t * msg_) | |
+{ | |
+ int rc = 0; | |
+ zmq_assert (mechanism != NULL); | |
+ | |
+ msg_->init_size(5); | |
+ msg_->set_flags(msg_t::command); | |
+ | |
+ memcpy(msg_->data(), "\4PONG", 5); | |
+ | |
+ rc = mechanism->encode (msg_); | |
+ next_msg = &stream_engine_t::pull_and_encode; | |
+ return rc; | |
+} | |
+ | |
+int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_) | |
+{ | |
+ if(memcmp(msg_->data(), "\4PING", 5) == 0) { | |
+ uint16_t remote_heartbeat_ttl; | |
+ // Get the remote heartbeat TTL to setup the timer | |
+ memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2); | |
+ remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl); | |
+ // The remote heartbeat is in 10ths of a second | |
+ // so we multiply it by 100 to get the timer interval in ms. | |
+ remote_heartbeat_ttl *= 100; | |
+ | |
+ if(!has_ttl_timer && remote_heartbeat_ttl > 0) { | |
+ add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id); | |
+ has_ttl_timer = true; | |
+ } | |
+ | |
+ next_msg = &stream_engine_t::produce_pong_message; | |
+ out_event(); | |
+ } | |
+ | |
+ return 0; | |
} | |
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp | |
index d42c692..4978714 100644 | |
--- a/src/stream_engine.hpp | |
+++ b/src/stream_engine.hpp | |
@@ -125,6 +125,10 @@ namespace zmq | |
void set_handshake_timer(); | |
+ int produce_ping_message(msg_t * msg_); | |
+ int process_heartbeat_message(msg_t * msg_); | |
+ int produce_pong_message(msg_t * msg_); | |
+ | |
// Underlying socket. | |
fd_t s; | |
@@ -204,6 +208,17 @@ namespace zmq | |
// True is linger timer is running. | |
bool has_handshake_timer; | |
+ // Heartbeat stuff | |
+ enum { | |
+ heartbeat_ivl_timer_id = 0x80, | |
+ heartbeat_timeout_timer_id = 0x81, | |
+ heartbeat_ttl_timer_id = 0x82 | |
+ }; | |
+ bool has_ttl_timer; | |
+ bool has_timeout_timer; | |
+ bool has_heartbeat_timer; | |
+ int heartbeat_timeout; | |
+ | |
// Socket | |
zmq::socket_base_t *socket; | |
diff --git a/tests/test_heartbeats.cpp b/tests/test_heartbeats.cpp | |
new file mode 100644 | |
index 0000000..c5aa060 | |
--- /dev/null | |
+++ b/tests/test_heartbeats.cpp | |
@@ -0,0 +1,315 @@ | |
+/* | |
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file | |
+ | |
+ This file is part of 0MQ. | |
+ | |
+ 0MQ is free software; you can redistribute it and/or modify it under | |
+ the terms of the GNU Lesser General Public License as published by | |
+ the Free Software Foundation; either version 3 of the License, or | |
+ (at your option) any later version. | |
+ | |
+ 0MQ is distributed in the hope that it will be useful, | |
+ but WITHOUT ANY WARRANTY; without even the implied warranty of | |
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
+ GNU Lesser General Public License for more details. | |
+ | |
+ You should have received a copy of the GNU Lesser General Public License | |
+ along with this program. If not, see <http://www.gnu.org/licenses/>. | |
+*/ | |
+ | |
+#include "testutil.hpp" | |
+#if defined (ZMQ_HAVE_WINDOWS) | |
+# include <winsock2.h> | |
+# include <ws2tcpip.h> | |
+# include <stdexcept> | |
+# define close closesocket | |
+#else | |
+# include <sys/socket.h> | |
+# include <netinet/in.h> | |
+# include <arpa/inet.h> | |
+# include <unistd.h> | |
+#endif | |
+ | |
+// Read one event off the monitor socket; return value and address | |
+// by reference, if not null, and event number by value. Returns -1 | |
+// in case of error. | |
+ | |
+static int | |
+get_monitor_event (void *monitor) | |
+{ | |
+ for(int i = 0; i < 2; i++) { | |
+ // First frame in message contains event number and value | |
+ zmq_msg_t msg; | |
+ zmq_msg_init (&msg); | |
+ if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) { | |
+ msleep(150); | |
+ continue; // Interruped, presumably | |
+ } | |
+ assert (zmq_msg_more (&msg)); | |
+ | |
+ uint8_t *data = (uint8_t *) zmq_msg_data (&msg); | |
+ uint16_t event = *(uint16_t *) (data); | |
+ | |
+ // Second frame in message contains event address | |
+ zmq_msg_init (&msg); | |
+ if (zmq_msg_recv (&msg, monitor, 0) == -1) { | |
+ return -1; // Interruped, presumably | |
+ } | |
+ assert (!zmq_msg_more (&msg)); | |
+ | |
+ return event; | |
+ } | |
+ return -1; | |
+} | |
+ | |
+static void | |
+mock_handshake (int fd) { | |
+ const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 }; | |
+ char buffer[128]; | |
+ memset(buffer, 0, sizeof(buffer)); | |
+ memcpy(buffer, zmtp_greeting, sizeof(zmtp_greeting)); | |
+ | |
+ int rc = send(fd, buffer, 64, 0); | |
+ assert(rc == 64); | |
+ | |
+ rc = recv(fd, buffer, 64, 0); | |
+ assert(rc == 64); | |
+ | |
+ const uint8_t zmtp_ready[43] = { | |
+ 4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e', | |
+ 0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y', | |
+ 0, 0, 0, 0 | |
+ }; | |
+ | |
+ memset(buffer, 0, sizeof(buffer)); | |
+ memcpy(buffer, zmtp_ready, 43); | |
+ rc = send(fd, buffer, 43, 0); | |
+ assert(rc == 43); | |
+ | |
+ rc = recv(fd, buffer, 43, 0); | |
+ assert(rc == 43); | |
+} | |
+ | |
+static void | |
+setup_curve(void * socket, int is_server) { | |
+ const char *secret_key; | |
+ const char *public_key; | |
+ const char *server_key; | |
+ | |
+ if(is_server) { | |
+ secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6"; | |
+ public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"; | |
+ server_key = NULL; | |
+ } | |
+ else { | |
+ secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs"; | |
+ public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID"; | |
+ server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"; | |
+ } | |
+ | |
+ zmq_setsockopt(socket, ZMQ_CURVE_SECRETKEY, secret_key, strlen(secret_key)); | |
+ zmq_setsockopt(socket, ZMQ_CURVE_PUBLICKEY, public_key, strlen(public_key)); | |
+ if(is_server) | |
+ zmq_setsockopt(socket, ZMQ_CURVE_SERVER, &is_server, sizeof(is_server)); | |
+ else | |
+ zmq_setsockopt(socket, ZMQ_CURVE_SERVERKEY, server_key, strlen(server_key)); | |
+} | |
+ | |
+static void | |
+prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_out, void ** mon_out) | |
+{ | |
+ int rc; | |
+ // We'll be using this socket in raw mode | |
+ void *server = zmq_socket (ctx, ZMQ_ROUTER); | |
+ assert (server); | |
+ | |
+ int value = 0; | |
+ rc = zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value)); | |
+ assert (rc == 0); | |
+ | |
+ if(set_heartbeats) { | |
+ value = 50; | |
+ rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); | |
+ assert (rc == 0); | |
+ } | |
+ | |
+ if(is_curve) | |
+ setup_curve(server, 1); | |
+ | |
+ rc = zmq_bind (server, "tcp://127.0.0.1:5556"); | |
+ assert (rc == 0); | |
+ | |
+ // Create and connect a socket for collecting monitor events on dealer | |
+ void *server_mon = zmq_socket (ctx, ZMQ_PAIR); | |
+ assert (server_mon); | |
+ | |
+ rc = zmq_socket_monitor (server, "inproc://monitor-dealer", | |
+ ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED); | |
+ assert (rc == 0); | |
+ | |
+ // Connect to the inproc endpoint so we'll get events | |
+ rc = zmq_connect (server_mon, "inproc://monitor-dealer"); | |
+ assert (rc == 0); | |
+ | |
+ *server_out = server; | |
+ *mon_out = server_mon; | |
+} | |
+ | |
+// This checks for a broken TCP connection (or, in this case a stuck one | |
+// where the peer never responds to PINGS). There should be an accepted event | |
+// then a disconnect event. | |
+static void | |
+test_heartbeat_timeout (void) | |
+{ | |
+ int rc; | |
+ | |
+ // Set up our context and sockets | |
+ void *ctx = zmq_ctx_new (); | |
+ assert (ctx); | |
+ | |
+ void * server, * server_mon; | |
+ prep_server_socket(ctx, 1, 0, &server, &server_mon); | |
+ | |
+ struct sockaddr_in ip4addr; | |
+ int s; | |
+ | |
+ ip4addr.sin_family = AF_INET; | |
+ ip4addr.sin_port = htons(5556); | |
+ inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr); | |
+ | |
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
+ rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr); | |
+ assert (rc > -1); | |
+ | |
+ // Mock a ZMTP 3 client so we can forcibly time out a connection | |
+ mock_handshake(s); | |
+ | |
+ // By now everything should report as connected | |
+ rc = get_monitor_event(server_mon); | |
+ assert(rc == ZMQ_EVENT_ACCEPTED); | |
+ | |
+ // We should have been disconnected | |
+ rc = get_monitor_event(server_mon); | |
+ assert(rc == ZMQ_EVENT_DISCONNECTED); | |
+ | |
+ close(s); | |
+ | |
+ rc = zmq_close (server); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_close (server_mon); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_ctx_term (ctx); | |
+ assert (rc == 0); | |
+} | |
+ | |
+// This checks that peers respect the TTL value in ping messages | |
+// We set up a mock ZMTP 3 client and send a ping message with a TLL | |
+// to a server that is not doing any heartbeating. Then we sleep, | |
+// if the server disconnects the client, then we know the TTL did | |
+// its thing correctly. | |
+static void | |
+test_heartbeat_ttl (void) | |
+{ | |
+ int rc, value; | |
+ | |
+ // Set up our context and sockets | |
+ void *ctx = zmq_ctx_new (); | |
+ assert (ctx); | |
+ | |
+ void * server, * server_mon, *client; | |
+ prep_server_socket(ctx, 0, 0, &server, &server_mon); | |
+ | |
+ client = zmq_socket(ctx, ZMQ_DEALER); | |
+ assert(client != NULL); | |
+ | |
+ // Set the heartbeat TTL to 0.1 seconds | |
+ value = 100; | |
+ zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value)); | |
+ | |
+ // Set the heartbeat interval to much longer than the TTL so that | |
+ // the socket times out oon the remote side. | |
+ value = 250; | |
+ zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); | |
+ | |
+ rc = zmq_connect(client, "tcp://localhost:5556"); | |
+ assert(rc == 0); | |
+ | |
+ // By now everything should report as connected | |
+ rc = get_monitor_event(server_mon); | |
+ assert(rc == ZMQ_EVENT_ACCEPTED); | |
+ | |
+ msleep(100); | |
+ | |
+ // We should have been disconnected | |
+ rc = get_monitor_event(server_mon); | |
+ assert(rc == ZMQ_EVENT_DISCONNECTED); | |
+ | |
+ rc = zmq_close (server); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_close (server_mon); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_close (client); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_ctx_term (ctx); | |
+ assert (rc == 0); | |
+} | |
+ | |
+// This checks for normal operation - that is pings and pongs being | |
+// exchanged normally. There should be an accepted event on the server, | |
+// and then no event afterwards. | |
+static void | |
+test_heartbeat_notimeout (int is_curve) | |
+{ | |
+ int rc; | |
+ | |
+ // Set up our context and sockets | |
+ void *ctx = zmq_ctx_new (); | |
+ assert (ctx); | |
+ | |
+ void * server, * server_mon; | |
+ prep_server_socket(ctx, 1, is_curve, &server, &server_mon); | |
+ | |
+ void * client = zmq_socket(ctx, ZMQ_DEALER); | |
+ if(is_curve) | |
+ setup_curve(client, 0); | |
+ rc = zmq_connect(client, "tcp://127.0.0.1:5556"); | |
+ | |
+ // Give it a sec to connect and handshake | |
+ msleep(100); | |
+ | |
+ // By now everything should report as connected | |
+ rc = get_monitor_event(server_mon); | |
+ assert(rc == ZMQ_EVENT_ACCEPTED); | |
+ | |
+ // We should still be connected because pings and pongs are happenin' | |
+ rc = get_monitor_event(server_mon); | |
+ assert(rc == -1); | |
+ | |
+ rc = zmq_close (client); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_close (server); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_close (server_mon); | |
+ assert (rc == 0); | |
+ | |
+ rc = zmq_ctx_term (ctx); | |
+ assert (rc == 0); | |
+} | |
+ | |
+int main (void) | |
+{ | |
+ setup_test_environment(); | |
+ test_heartbeat_timeout(); | |
+ test_heartbeat_ttl(); | |
+ // Run this test without curve | |
+ test_heartbeat_notimeout(0); | |
+ // Then rerun it with curve | |
+ test_heartbeat_notimeout(1); | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment