Skip to content

Instantly share code, notes, and snippets.

@jstaursky
Last active November 7, 2021 12:27
Show Gist options
  • Save jstaursky/65830026f3838ee41e034967b8aa0bae to your computer and use it in GitHub Desktop.
Save jstaursky/65830026f3838ee41e034967b8aa0bae to your computer and use it in GitHub Desktop.
example zmq + protobuf
// *** SUMMARY ***
// DEALER is like an asynchronous REQ socket
//
// Where we use a REQ socket, we can use a DEALER; we just have to read and
// write the envelope ourselves.
#include "message.pb.h"
#include <zmq.hpp>
#include <google/protobuf/text_format.h>
#define MSG_DELIMIT zmq::str_buffer("")
int main(int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_DEALER);
socket.setsockopt(ZMQ_IDENTITY, "foo", sizeof("foo"));
socket.connect("tcp://localhost:6666");
while (1) {
// MAKE REQUEST
socket.send(MSG_DELIMIT, zmq::send_flags::none);
// ROUTER REPLIED!
std::string info;
zmq::message_t inbound;
socket.recv(&inbound, 0);
tutorial::Person person;
info = std::string(static_cast<char *>(inbound.data()), inbound.size());
person.ParseFromString(info);
std::string name = person.name();
std::cout << name << std::endl;
}
return 0;
}
all:
g++ -o dealer dealer.cpp message.pb.cc `pkg-config --cflags --libs libzmq protobuf`
g++ -o router router.cpp message.pb.cc `pkg-config --cflags --libs libzmq protobuf`
prot:
protoc *.proto --cpp_out="."
# To build: run "make prot" then "make"
syntax = "proto3";
package tutorial;
message Person {
string name = 1;
int32 id = 2;
string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
string number = 1;
PhoneType type = 2;
}
repeated PhoneNumber phone = 4;
}
// The ROUTER socket, unlike other sockets, _tracks every connection it has_, and
// _tells the caller about these_. The way it tells the caller is to stick the
// connection identity in front of each message received. An identity, sometimes
// called an address, is just a binary string with no meaning except “this is a
// unique handle to the connection”. Then, when you send a message via a ROUTER
// socket, you first send an identity frame.
// *** zmq_socket() MAN PAGE ***
//
// When receiving messages a ZMQ_ROUTER socket shall prepend a message part
// containing the identity of the originating peer to the message before passing
// it to the application. Messages received are fair-queued from among all
// connected peers. When sending messages a ZMQ_ROUTER socket shall remove the
// first part of the message and use it to determine the identity of the peer
// the message shall be routed to.
// *** SUMMARY ***
// ROUTER is like an asynchronous REP socket
// Where we use a REP socket, we can stick a ROUTER; we just need to manage the
// identities ourselves.
#include "message.pb.h"
#include <unistd.h>
#include <zmq.hpp>
#include <google/protobuf/text_format.h>
#define MSG_DELIMIT zmq::str_buffer("")
int main()
{
zmq::context_t context;
zmq::socket_t broker(context, ZMQ_ROUTER);
broker.bind("tcp://127.0.0.1:6666");
while (1) {
// IDENTIFY DEALER/REQUESTER
zmq::message_t inbound;
broker.recv(&inbound, 0);
std::string reciever_address = std::string((char*)inbound.data(), inbound.size());
// PREPEND IDENTITY/ADDRESS FRAME TO OUTGOING MESSAGE
zmq::message_t outbound_id(reciever_address.size());
memcpy(outbound_id.data(), reciever_address.data(), reciever_address.size());
// I'm not sure how to prepend to message directly, so just sending out the
// outbound/reciever address first.
broker.send(outbound_id, zmq::send_flags::sndmore);
broker.send(MSG_DELIMIT, zmq::send_flags::sndmore);
// FORM A MESSAGE
tutorial::Person person;
person.set_id(1234);
person.set_name("john");
std::string msg_str;
person.SerializeToString(&msg_str);
// SEND OUT MESSAGE.
zmq::message_t outbound(msg_str.size());
memcpy((void *)outbound.data(), msg_str.c_str(), msg_str.size());
broker.send(outbound);
sleep(2);
}
return 0;
}

protobuf + zmq

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment