Skip to content

Instantly share code, notes, and snippets.

@OlegJakushkin
Last active April 23, 2016 09:32
Show Gist options
  • Save OlegJakushkin/504c3861631d8f2808e66c102a315b05 to your computer and use it in GitHub Desktop.
Save OlegJakushkin/504c3861631d8f2808e66c102a315b05 to your computer and use it in GitHub Desktop.
// Install-Package boost -Version 1.55.0.16
// Install-Package boost_system-vc110 -Version 1.55.0.16
#include <sstream>
#include <iostream>
#include <thread>
#include <map>
#include <mutex>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/map.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <zmq.hpp>
using namespace std;
struct Message {
string from;
string to;
string message;
map<string, string> attributes;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & from;
ar & to;
ar & message;
ar & attributes;
}
};
void send(zmq::socket_t & socket,
Message message) {
stringstream stream;
boost::archive::text_oarchive serializer(stream);
serializer << message;
string serialized = "";
if(message.attributes["topic"] != "") {
serialized = message.attributes["topic"] + "|";
}
serialized += stream.str();
zmq::message_t request (serialized.size());
memcpy (request.data (),
serialized.data(), serialized.size());
socket.send (request);
}
bool receive(zmq::socket_t & socket,
Message & requestMessage) {
zmq::message_t zmqRequestMessage;
auto hasMsg = socket.recv (&zmqRequestMessage, ZMQ_DONTWAIT);
if(hasMsg) {
string data( static_cast<char*>(zmqRequestMessage.data()), zmqRequestMessage.size());
stringstream inputStream(data);
boost::archive::text_iarchive deserializer(inputStream);
deserializer >> requestMessage;
}
return hasMsg;
}
vector<string> Subscribe;
vector<string> UnSubscribe;
mutex key;
void UpdateSubscriptions(zmq::socket_t & socket) {
lock_guard<mutex> lock (key);
for(auto & e : Subscribe) {
socket.setsockopt(ZMQ_SUBSCRIBE, e);
}
for(auto & e : UnSubscribe) {
socket.setsockopt(ZMQ_UNSUBSCRIBE, e);
}
Subscribe.clear();
UnSubscribe.clear();
}
int main () {
string serverPubUrl = "tcp://localhost:5555";
string serverRespUrl = "tcp://localhost:5555";
string clientUrl = "tcp://*:6555";
cout << "Please input server address [tcp://127.0.0.1:5555]" << endl;
cin >> serverRespUrl;
zmq::context_t context (5);
auto pooler = [&](){
zmq::socket_t socket (context, ZMQ_SUB);
socket.connect (serverPubUrl);
Message requestMessage;
while(true) {
UpdateSubscriptions(socket);
auto hasMsg = receive(socket, requestMessage);
if(hasMsg) {
cout << requestMessage.from << ":"
<< requestMessage.attributes["topic"]
<< endl
<< requestMessage.message << endl;
}
this_thread::sleep_for(chrono::milliseconds(50));
}
};
Message m;
cout << "name: ";
cin >> m.from;
auto correctData = false;
zmq::socket_t pubSocket (context, ZMQ_PUB);
while(!correctData) {
try {
zmq::socket_t requestSocket (context, ZMQ_REQ);
requestSocket.connect (serverRespUrl);
cout << "our address [tcp://127.0.0.1:6665]: ";
cin >> m.attributes["PublisherUrl"];
pubSocket.bind(m.attributes["PublisherUrl"]);
send(requestSocket, m);
Message result;
auto hasMsg = false;
while(!hasMsg) {
hasMsg = receive(requestSocket, result);
}
serverPubUrl = result.attributes["ServerXPubUrl"];
correctData = true;
} catch(std::exception &e) {
cout << "wrong data, please try again: "
<< e.what()
<< endl;
}
}
thread t(pooler);
Message p;
p.from = m.from;
while(true) {
string command;
cout << "command [sub] || [unsub] || [pub]";
cin >> command;
string topic;
cout << "Input topic" << endl;
cin >> topic;
if(command == "sub") {
lock_guard<mutex> lock(key);
Subscribe.push_back(topic);
} else if(command == "unsub") {
lock_guard<mutex> lock(key);
UnSubscribe.push_back(topic);
} else if(command == "pub") {
string msg;
cout << "Input word" << endl;
cin >> msg;
p.message = msg;
p.attributes["topic"] = topic;
send(pubSocket, m);
}
}
return 0;
}
// Install-Package boost -Version 1.55.0.16
// Install-Package boost_system-vc110 -Version 1.55.0.16
#include <sstream>
#include <thread>
#include <map>
#include <mutex>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/map.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <zmq.hpp>
using namespace std;
struct Message {
string from;
string to;
string message;
map<string, string> attributes;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & from;
ar & to;
ar & message;
ar & attributes;
}
};
void send(zmq::socket_t & socket,
Message message) {
stringstream stream;
boost::archive::text_oarchive serializer(stream);
serializer << message;
string serialized = "";
if(message.attributes["topic"] != "") {
serialized = message.attributes["topic"] + "|";
}
serialized += stream.str();
zmq::message_t request (serialized.size());
memcpy (request.data (), serialized.data(), serialized.size());
socket.send (request);
}
bool receive(zmq::socket_t & socket,
Message & requestMessage) {
zmq::message_t zmqRequestMessage;
auto hasMsg = socket.recv (&zmqRequestMessage, ZMQ_DONTWAIT);
if(hasMsg) {
string data( static_cast<char*>(zmqRequestMessage.data()), zmqRequestMessage.size());
stringstream inputStream(data);
boost::archive::text_iarchive deserializer(inputStream);
deserializer >> requestMessage;
}
return hasMsg;
}
vector<string> Subscribe;
mutex key;
void UpdateSubscriptions(zmq::socket_t & socket) {
lock_guard<mutex> lock (key);
for(auto & e : Subscribe) {
socket.setsockopt(ZMQ_SUBSCRIBE, e);
}
Subscribe.clear();
}
int main () {
auto serverRep = "tcp://*:5555";
string serverXPub = "tcp://*:5556" ;
string serverPublicXPub = "tcp://127.0.0.1:5556" ;
cout << "please input public XPUB adress [tcp://127.0.0.1:5556]" << endl;
cin >> serverPublicXPub;
zmq::context_t context (4);
auto pooler = [&](){
zmq::socket_t publisher (context, ZMQ_XPUB);
publisher.bind(serverXPub);
zmq::socket_t subscriber (context, ZMQ_XSUB);
Message clientMessage;
while(true) {
auto res = receive(subscriber, clientMessage);
if(res) {
send(publisher, clientMessage);
}
this_thread::sleep_for(chrono::milliseconds(50));
lock_guard<mutex> lock (key);
for(auto e : Subscribe) {
subscriber.connect(e);
}
Subscribe.clear();
}
};
thread t2(pooler);
auto responder = [&](){
zmq::socket_t socket (context, ZMQ_REP);
socket.bind (serverRep);
Message requestMessage;
Message responseMessage;
responseMessage.from = "Server";
responseMessage.attributes["ServerXPubUrl"] = serverPublicXPub;
while(true) {
auto hasMsg = receive(socket, requestMessage);
if(hasMsg) {
lock_guard<mutex> lock (key);
Subscribe.push_back(requestMessage.attributes["PublisherUrl"]);
send(socket, responseMessage);
}
this_thread::sleep_for(chrono::milliseconds(50));
}
};
thread t(responder);
cout << "server is up and running..." << endl;
while(true) {
this_thread::sleep_for(chrono::milliseconds(50));
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment