Skip to content

Instantly share code, notes, and snippets.

@StephanOpfer
Last active December 29, 2018 05:40
Show Gist options
  • Save StephanOpfer/98e32c13c822c33e06d56bc82956c7c2 to your computer and use it in GitHub Desktop.
Save StephanOpfer/98e32c13c822c33e06d56bc82956c7c2 to your computer and use it in GitHub Desktop.
Sending Cap'n Proto over ZeroMQ
#include <discovery_msgs/beacon.capnp.h>
#include <zmq.h>
#include <assert.h>
#include <capnp/message.h>
#include <capnp/serialize-packed.h>
#include <iostream>
/**
* Checks the return code and reports an error if present.
* If abortIfError is set to true, it also aborts the process.
*/
void check(int returnCode, std::string methodName, bool abortIfError)
{
if (returnCode != 0)
{
std::cerr << methodName << " returned: " << errno << " - " << zmq_strerror(errno) << std::endl;
if (abortIfError)
assert(returnCode);
}
}
int main(int argc, char **argv)
{
// zmq stuff
auto ctx = zmq_ctx_new();
assert(ctx);
// create socket
auto socket = zmq_socket(ctx, ZMQ_DISH);
assert(socket);
// bind socket
check(zmq_bind(socket, "udp://224.0.0.1:5555"), "zmq_bind", true);
check(zmq_join(socket, "TestMCGroup"), "zmq_join", true);
zmq_msg_t msg;
check(zmq_msg_init(&msg), "zmq_msg_init", true);
int nbytes = zmq_msg_recv(&msg, socket, 0);
std::cout << "Received " << nbytes << " bytes!" << std::endl;
// Received message must contain an integral number of words.
assert(zmq_msg_size(&msg) % sizeof(capnp::word) == 0);
auto num_words = zmq_msg_size(&msg) / sizeof(capnp::word);
std::cout << "The zmq_msg_size(&msg) is " << zmq_msg_size(&msg) << ", which are " << num_words << " words."<< std::endl;
if (reinterpret_cast<uintptr_t>(zmq_msg_data(&msg)) % sizeof(capnp::word) == 0)
{
std::cout << "The message is memory aligned." << std::endl;
}
else
{
std::cout << "The message is not memory aligned." << std::endl;
}
/* uncomment for printing the bytes in hex */
// auto msgByteArray = reinterpret_cast<char *>(zmq_msg_data(&msg));
// for (int i = 0; i < zmq_msg_size(&msg); i++)
// {
// printf("%02X:", msgByteArray[i]);
// }
// printf("\n");
auto wordArray = kj::ArrayPtr<capnp::word const>(reinterpret_cast<capnp::word const *>(zmq_msg_data(&msg)), num_words);
::capnp::FlatArrayMessageReader msgReader = ::capnp::FlatArrayMessageReader(wordArray);
auto beacon = msgReader.getRoot<discovery_msgs::Beacon>();
std::cout << "Received Message: " << beacon.toString().flatten().cStr() << std::endl;
check(zmq_msg_close(&msg), "zmq_msg_close", false);
}
#include <discovery_msgs/beacon.capnp.h>
#include <zmq.h>
#include <uuid/uuid.h>
#include <assert.h>
#include <capnp/message.h>
#include <capnp/serialize-packed.h>
#include <iostream>
#include <unistd.h>
/**
* Checks the return code and reports an error if present.
* If abortIfError is set to true, it also aborts the process.
*/
void check(int returnCode, std::string methodName, bool abortIfError)
{
if (returnCode != 0)
{
std::cerr << methodName << " returned: " << errno << " - " << zmq_strerror(errno) << std::endl;
if (abortIfError)
assert(returnCode);
}
}
int main(int argc, char **argv)
{
auto ctx = zmq_ctx_new();
assert(ctx);
auto socket = zmq_socket(ctx, ZMQ_RADIO);
check(zmq_connect(socket, "udp://224.0.0.1:5555"), "zmq_connect", true);
::capnp::MallocMessageBuilder msgBuilder;
discovery_msgs::Beacon::Builder beaconMsgBuilder = msgBuilder.initRoot<discovery_msgs::Beacon>();
// set content
uuid_t uuid;
uuid_generate(uuid);
beaconMsgBuilder.setIp("192.186.0.1");
beaconMsgBuilder.setPort(6666);
beaconMsgBuilder.setUuid(kj::arrayPtr(uuid, sizeof(uuid)));
zmq_msg_t msg;
auto wordArray = capnp::messageToFlatArray(msgBuilder);
std::cout << "Send Message: " << beaconMsgBuilder.toString().flatten().cStr() << std::endl;
check(zmq_msg_init_data(&msg, wordArray.begin(), wordArray.size() * sizeof(capnp::word), NULL, NULL),
"zmq_msg_init_data", false);
/* uncomment for printing bytes */
// auto msgByteArray = reinterpret_cast<char *>(zmq_msg_data(&msg));
// for (int i = 0; i < zmq_msg_size(&msg); i++)
// {
// std::cout << std::hex << msgByteArray[i] << ":";
//
// }
// std::cout << std::endl;
// set group
check(zmq_msg_set_group(&msg, "TestMCGroup"), "zmq_msg_set_group", false);
// send
int numBytesSend = zmq_msg_send(&msg, socket, 0);
if (numBytesSend == -1)
{
std::cerr << "zmq_msg_send was unsuccessfull: " << errno << " - " << zmq_strerror(errno) << std::endl;
check(zmq_msg_close(&msg), "zmq_msg_close", false);
}
else
{
std::cout << numBytesSend << " bytes sent." << std::endl;
}
sleep(1); // <-- fixes the problem
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment