Created
June 27, 2015 15:59
-
-
Save jbreams/7f507beff87987afad98 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Compile with clang++ -std=c++11 -g -o zmq_test zmq_test.cpp -lzmq | |
/** | |
* This demonstrates how a ZMQ_ROUTER socket can leave file descriptors | |
* open forever. Compile this, and then run it like so. | |
* One instance as a router: | |
* zmq_test router 7777 | |
* That starts a router that listens on port 7777. | |
* | |
* Next, start a number of peers which connect to this router. | |
* zmq_test peer localhost 7777 | |
* | |
* These will announce themselves to the router via random identities, and | |
* the router will start pinging them every 5 seconds, forever. | |
* | |
* Next, black-hole the peers by severing TCP traffic to the router. This can | |
* be done via iptables. | |
* | |
* sudo iptables -A INPUT -p tcp --dport=7777 -j DROP | |
* sudo iptables -A INPUT -p tcp --sport=7777 -j DROP | |
* | |
* At this point, the peers and router can not exchange packets and the | |
* connection is effectively dead. | |
* | |
* Note, that the file descriptors in the router process remain open forever, | |
* despite it trying to send a ping to each peer identity every 5 seconds. | |
* | |
* You can clear the iptables rules like this to restore the network: | |
* sudo iptables -F INPUT | |
* | |
* Note, that the peers never reconnect to the router once the network is | |
* fixed. | |
* | |
* You can see open file descriptors like this given your router's process ID | |
* lsof -p <pid> | grep TCP | |
* | |
*/ | |
#include <zmq.h> | |
#include <stdexcept> | |
#include <string> | |
#include <sstream> | |
#include <vector> | |
#include <stdio.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include <algorithm> | |
// Invoke like this to run a router listening on a port: | |
// zmq_test router 1234 | |
// Invoke like this to connect to a router | |
// zmq_test peer <hostname> <port> | |
using namespace std; | |
void | |
read_all_parts(void *zmq_sock, vector<string> *parts) { | |
int more; | |
size_t more_size = sizeof(more); | |
do { | |
zmq_msg_t part; | |
int rc = zmq_msg_init (&part); | |
if (rc != 0) { | |
zmq_msg_close (&part); | |
return; | |
} | |
rc = zmq_msg_recv (&part, zmq_sock, 0); | |
if (rc == -1) { | |
fprintf(stderr, "zmq_msg_recv failed:%s\n", zmq_strerror(errno)); | |
zmq_msg_close (&part); | |
return; | |
} | |
parts->push_back(string((const char *)zmq_msg_data(&part), | |
zmq_msg_size(&part))); | |
rc = zmq_getsockopt (zmq_sock, ZMQ_RCVMORE, &more, &more_size); | |
if (rc != 0) { | |
fprintf(stderr, "zmq_getsockopt failed\n"); | |
zmq_msg_close (&part); | |
return; | |
} | |
zmq_msg_close (&part); | |
} while (more); | |
} | |
void | |
send_all_parts(void *zmq_sock, vector<string> parts) { | |
for (int i = 0; i < parts.size(); i++) { | |
bool last = (i == parts.size() - 1); | |
zmq_msg_t msg; | |
string part = parts[i]; | |
int rc = zmq_msg_init_size (&msg, part.length()); | |
if (rc != 0) { | |
return; | |
} | |
memcpy(zmq_msg_data(&msg), part.data(), part.length()); | |
rc = zmq_msg_send(&msg, zmq_sock, last ? 0 : ZMQ_SNDMORE); | |
if (rc == -1) { | |
return; | |
} | |
} | |
} | |
void | |
router(int port) { | |
void *ctx = zmq_ctx_new(); | |
void *sock = zmq_socket(ctx, ZMQ_ROUTER); | |
int value = 500; | |
zmq_setsockopt(sock, ZMQ_IDENTITY, "router", 6); | |
zmq_setsockopt(sock, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); | |
zmq_setsockopt(sock, ZMQ_HEARTBEAT_TTL, &value, sizeof(value)); | |
stringstream ss; | |
ss << "tcp://*:" << port; | |
fprintf(stdout, "Listening on: %s\n", ss.str().data()); | |
int rc = zmq_bind(sock, ss.str().data()); | |
if (rc != 0) { | |
fprintf(stderr, "zmq_bind failed: %s\n", zmq_strerror(errno)); | |
} | |
struct zmq_pollitem_t poll_item; | |
poll_item.socket = sock; | |
poll_item.fd = 0; | |
poll_item.events = ZMQ_POLLIN; | |
poll_item.revents = 0; | |
vector<string> peers; | |
time_t last_ping = time(0); | |
while (true) { | |
int rc = zmq_poll(&poll_item, 1, 5000); | |
time_t now = time(0); | |
if (now - last_ping >= 5) { | |
// Timed out. Ping everybody. | |
fprintf(stdout, "Pinging all known peers\n"); | |
for (auto peer : peers) { | |
fprintf(stdout, "\t%s\n", peer.data()); | |
vector<string> envelope; | |
envelope.push_back(peer); | |
envelope.push_back(""); | |
envelope.push_back("ping"); | |
} | |
last_ping = now; | |
} | |
if (rc == 0) { | |
continue; | |
} | |
vector<string> envelope; | |
read_all_parts(sock, &envelope); | |
//fprintf(stdout, "Got %zu parts\n", envelope.size()); | |
if (envelope.size() != 3) { | |
continue; | |
} | |
string peer = envelope[0]; | |
if (find(peers.begin(), peers.end(), peer) == peers.end()) { | |
fprintf(stdout, "New peer: %s\n", peer.data()); | |
peers.push_back(peer); | |
} | |
fprintf(stdout, "Peer (%s) says (%s)\n", peer.data(), envelope[2].data()); | |
} | |
} | |
void | |
peer(char *router_hostname, int router_port) { | |
void *ctx = zmq_ctx_new(); | |
void *sock = zmq_socket(ctx, ZMQ_ROUTER); | |
{ | |
stringstream ss; | |
ss << "peer-" << getpid(); | |
string id = ss.str(); | |
zmq_setsockopt(sock, ZMQ_IDENTITY, id.data(), id.length()); | |
} | |
stringstream ss; | |
ss << "tcp://" << router_hostname << ":" << router_port; | |
fprintf(stdout, "Connecting to: %s\n", ss.str().data()); | |
int rc = zmq_connect(sock, ss.str().data()); | |
if (rc != 0) { | |
fprintf(stderr, "zmq_connect failed: %s\n", zmq_strerror(errno)); | |
} | |
while (true) { | |
vector<string> envelope; | |
envelope.push_back("router"); | |
envelope.push_back(""); | |
envelope.push_back("ping"); | |
send_all_parts(sock, envelope); | |
sleep(5); | |
} | |
} | |
int | |
main(int argc, char** argv) { | |
if (argc < 2) { | |
fprintf(stderr, "router or peer argument required\n"); | |
return -1; | |
} | |
auto mode = string(argv[1]); | |
if (mode == "router") { | |
if (argc < 3) { | |
fprintf(stderr, "Port required for router mode\n"); | |
return -1; | |
} | |
int port; | |
try { | |
port = stoi(string(argv[2])); | |
} catch (invalid_argument) { | |
fprintf(stderr, "%s isn't a number\n", argv[2]); | |
return -1; | |
} | |
fprintf(stdout, "Running in router mode on port %d (pid %d)\n", port, getpid()); | |
router(port); | |
} else if (mode == "peer") { | |
if (argc < 4) { | |
fprintf(stderr, "router hostname and port number required\n"); | |
return -1; | |
} | |
int port; | |
try { | |
port = stoi(string(argv[3])); | |
} catch (invalid_argument) { | |
fprintf(stderr, "%s isn't a number\n", argv[3]); | |
return -1; | |
} | |
fprintf(stdout, "Connecting to %s:%d (pid %d)\n", argv[2], port, getpid()); | |
peer(argv[2], port); | |
} else { | |
fprintf(stderr, "Unknown mode %s\n", argv[1]); | |
return -1; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment