Skip to content

Instantly share code, notes, and snippets.

@jbreams
Created June 27, 2015 15:59
Show Gist options
  • Save jbreams/7f507beff87987afad98 to your computer and use it in GitHub Desktop.
Save jbreams/7f507beff87987afad98 to your computer and use it in GitHub Desktop.
// Compile with clang++ -std=c++11 -g -o zmq_test zmq_test.cpp -lzmq
/**
* This demonstrates how a ZMQ_ROUTER socket can leave file descriptors
* open forever. Compile this, and then run it like so.
* One instance as a router:
* zmq_test router 7777
* That starts a router that listens on port 7777.
*
* Next, start a number of peers which connect to this router.
* zmq_test peer localhost 7777
*
* These will announce themselves to the router via random identities, and
* the router will start pinging them every 5 seconds, forever.
*
* Next, black-hole the peers by severing TCP traffic to the router. This can
* be done via iptables.
*
* sudo iptables -A INPUT -p tcp --dport=7777 -j DROP
* sudo iptables -A INPUT -p tcp --sport=7777 -j DROP
*
* At this point, the peers and router can not exchange packets and the
* connection is effectively dead.
*
* Note, that the file descriptors in the router process remain open forever,
* despite it trying to send a ping to each peer identity every 5 seconds.
*
* You can clear the iptables rules like this to restore the network:
* sudo iptables -F INPUT
*
* Note, that the peers never reconnect to the router once the network is
* fixed.
*
* You can see open file descriptors like this given your router's process ID
* lsof -p <pid> | grep TCP
*
*/
#include <zmq.h>
#include <stdexcept>
#include <string>
#include <sstream>
#include <vector>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <algorithm>
// Invoke like this to run a router listening on a port:
// zmq_test router 1234
// Invoke like this to connect to a router
// zmq_test peer <hostname> <port>
using namespace std;
void
read_all_parts(void *zmq_sock, vector<string> *parts) {
int more;
size_t more_size = sizeof(more);
do {
zmq_msg_t part;
int rc = zmq_msg_init (&part);
if (rc != 0) {
zmq_msg_close (&part);
return;
}
rc = zmq_msg_recv (&part, zmq_sock, 0);
if (rc == -1) {
fprintf(stderr, "zmq_msg_recv failed:%s\n", zmq_strerror(errno));
zmq_msg_close (&part);
return;
}
parts->push_back(string((const char *)zmq_msg_data(&part),
zmq_msg_size(&part)));
rc = zmq_getsockopt (zmq_sock, ZMQ_RCVMORE, &more, &more_size);
if (rc != 0) {
fprintf(stderr, "zmq_getsockopt failed\n");
zmq_msg_close (&part);
return;
}
zmq_msg_close (&part);
} while (more);
}
void
send_all_parts(void *zmq_sock, vector<string> parts) {
for (int i = 0; i < parts.size(); i++) {
bool last = (i == parts.size() - 1);
zmq_msg_t msg;
string part = parts[i];
int rc = zmq_msg_init_size (&msg, part.length());
if (rc != 0) {
return;
}
memcpy(zmq_msg_data(&msg), part.data(), part.length());
rc = zmq_msg_send(&msg, zmq_sock, last ? 0 : ZMQ_SNDMORE);
if (rc == -1) {
return;
}
}
}
void
router(int port) {
void *ctx = zmq_ctx_new();
void *sock = zmq_socket(ctx, ZMQ_ROUTER);
int value = 500;
zmq_setsockopt(sock, ZMQ_IDENTITY, "router", 6);
zmq_setsockopt(sock, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
zmq_setsockopt(sock, ZMQ_HEARTBEAT_TTL, &value, sizeof(value));
stringstream ss;
ss << "tcp://*:" << port;
fprintf(stdout, "Listening on: %s\n", ss.str().data());
int rc = zmq_bind(sock, ss.str().data());
if (rc != 0) {
fprintf(stderr, "zmq_bind failed: %s\n", zmq_strerror(errno));
}
struct zmq_pollitem_t poll_item;
poll_item.socket = sock;
poll_item.fd = 0;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;
vector<string> peers;
time_t last_ping = time(0);
while (true) {
int rc = zmq_poll(&poll_item, 1, 5000);
time_t now = time(0);
if (now - last_ping >= 5) {
// Timed out. Ping everybody.
fprintf(stdout, "Pinging all known peers\n");
for (auto peer : peers) {
fprintf(stdout, "\t%s\n", peer.data());
vector<string> envelope;
envelope.push_back(peer);
envelope.push_back("");
envelope.push_back("ping");
}
last_ping = now;
}
if (rc == 0) {
continue;
}
vector<string> envelope;
read_all_parts(sock, &envelope);
//fprintf(stdout, "Got %zu parts\n", envelope.size());
if (envelope.size() != 3) {
continue;
}
string peer = envelope[0];
if (find(peers.begin(), peers.end(), peer) == peers.end()) {
fprintf(stdout, "New peer: %s\n", peer.data());
peers.push_back(peer);
}
fprintf(stdout, "Peer (%s) says (%s)\n", peer.data(), envelope[2].data());
}
}
void
peer(char *router_hostname, int router_port) {
void *ctx = zmq_ctx_new();
void *sock = zmq_socket(ctx, ZMQ_ROUTER);
{
stringstream ss;
ss << "peer-" << getpid();
string id = ss.str();
zmq_setsockopt(sock, ZMQ_IDENTITY, id.data(), id.length());
}
stringstream ss;
ss << "tcp://" << router_hostname << ":" << router_port;
fprintf(stdout, "Connecting to: %s\n", ss.str().data());
int rc = zmq_connect(sock, ss.str().data());
if (rc != 0) {
fprintf(stderr, "zmq_connect failed: %s\n", zmq_strerror(errno));
}
while (true) {
vector<string> envelope;
envelope.push_back("router");
envelope.push_back("");
envelope.push_back("ping");
send_all_parts(sock, envelope);
sleep(5);
}
}
int
main(int argc, char** argv) {
if (argc < 2) {
fprintf(stderr, "router or peer argument required\n");
return -1;
}
auto mode = string(argv[1]);
if (mode == "router") {
if (argc < 3) {
fprintf(stderr, "Port required for router mode\n");
return -1;
}
int port;
try {
port = stoi(string(argv[2]));
} catch (invalid_argument) {
fprintf(stderr, "%s isn't a number\n", argv[2]);
return -1;
}
fprintf(stdout, "Running in router mode on port %d (pid %d)\n", port, getpid());
router(port);
} else if (mode == "peer") {
if (argc < 4) {
fprintf(stderr, "router hostname and port number required\n");
return -1;
}
int port;
try {
port = stoi(string(argv[3]));
} catch (invalid_argument) {
fprintf(stderr, "%s isn't a number\n", argv[3]);
return -1;
}
fprintf(stdout, "Connecting to %s:%d (pid %d)\n", argv[2], port, getpid());
peer(argv[2], port);
} else {
fprintf(stderr, "Unknown mode %s\n", argv[1]);
return -1;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment