Last active
April 23, 2016 09:32
-
-
Save OlegJakushkin/504c3861631d8f2808e66c102a315b05 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Install-Package boost -Version 1.55.0.16 | |
// Install-Package boost_system-vc110 -Version 1.55.0.16 | |
#include <sstream> | |
#include <iostream> | |
#include <thread> | |
#include <map> | |
#include <mutex> | |
#include <boost/serialization/serialization.hpp> | |
#include <boost/serialization/map.hpp> | |
#include <boost/archive/text_oarchive.hpp> | |
#include <boost/archive/text_iarchive.hpp> | |
#include <zmq.hpp> | |
using namespace std; | |
struct Message { | |
string from; | |
string to; | |
string message; | |
map<string, string> attributes; | |
template<class Archive> | |
void serialize(Archive & ar, const unsigned int version) | |
{ | |
ar & from; | |
ar & to; | |
ar & message; | |
ar & attributes; | |
} | |
}; | |
void send(zmq::socket_t & socket, | |
Message message) { | |
stringstream stream; | |
boost::archive::text_oarchive serializer(stream); | |
serializer << message; | |
string serialized = ""; | |
if(message.attributes["topic"] != "") { | |
serialized = message.attributes["topic"] + "|"; | |
} | |
serialized += stream.str(); | |
zmq::message_t request (serialized.size()); | |
memcpy (request.data (), | |
serialized.data(), serialized.size()); | |
socket.send (request); | |
} | |
bool receive(zmq::socket_t & socket, | |
Message & requestMessage) { | |
zmq::message_t zmqRequestMessage; | |
auto hasMsg = socket.recv (&zmqRequestMessage, ZMQ_DONTWAIT); | |
if(hasMsg) { | |
string data( static_cast<char*>(zmqRequestMessage.data()), zmqRequestMessage.size()); | |
stringstream inputStream(data); | |
boost::archive::text_iarchive deserializer(inputStream); | |
deserializer >> requestMessage; | |
} | |
return hasMsg; | |
} | |
vector<string> Subscribe; | |
vector<string> UnSubscribe; | |
mutex key; | |
void UpdateSubscriptions(zmq::socket_t & socket) { | |
lock_guard<mutex> lock (key); | |
for(auto & e : Subscribe) { | |
socket.setsockopt(ZMQ_SUBSCRIBE, e); | |
} | |
for(auto & e : UnSubscribe) { | |
socket.setsockopt(ZMQ_UNSUBSCRIBE, e); | |
} | |
Subscribe.clear(); | |
UnSubscribe.clear(); | |
} | |
int main () { | |
string serverPubUrl = "tcp://localhost:5555"; | |
string serverRespUrl = "tcp://localhost:5555"; | |
string clientUrl = "tcp://*:6555"; | |
cout << "Please input server address [tcp://127.0.0.1:5555]" << endl; | |
cin >> serverRespUrl; | |
zmq::context_t context (5); | |
auto pooler = [&](){ | |
zmq::socket_t socket (context, ZMQ_SUB); | |
socket.connect (serverPubUrl); | |
Message requestMessage; | |
while(true) { | |
UpdateSubscriptions(socket); | |
auto hasMsg = receive(socket, requestMessage); | |
if(hasMsg) { | |
cout << requestMessage.from << ":" | |
<< requestMessage.attributes["topic"] | |
<< endl | |
<< requestMessage.message << endl; | |
} | |
this_thread::sleep_for(chrono::milliseconds(50)); | |
} | |
}; | |
Message m; | |
cout << "name: "; | |
cin >> m.from; | |
auto correctData = false; | |
zmq::socket_t pubSocket (context, ZMQ_PUB); | |
while(!correctData) { | |
try { | |
zmq::socket_t requestSocket (context, ZMQ_REQ); | |
requestSocket.connect (serverRespUrl); | |
cout << "our address [tcp://127.0.0.1:6665]: "; | |
cin >> m.attributes["PublisherUrl"]; | |
pubSocket.bind(m.attributes["PublisherUrl"]); | |
send(requestSocket, m); | |
Message result; | |
auto hasMsg = false; | |
while(!hasMsg) { | |
hasMsg = receive(requestSocket, result); | |
} | |
serverPubUrl = result.attributes["ServerXPubUrl"]; | |
correctData = true; | |
} catch(std::exception &e) { | |
cout << "wrong data, please try again: " | |
<< e.what() | |
<< endl; | |
} | |
} | |
thread t(pooler); | |
Message p; | |
p.from = m.from; | |
while(true) { | |
string command; | |
cout << "command [sub] || [unsub] || [pub]"; | |
cin >> command; | |
string topic; | |
cout << "Input topic" << endl; | |
cin >> topic; | |
if(command == "sub") { | |
lock_guard<mutex> lock(key); | |
Subscribe.push_back(topic); | |
} else if(command == "unsub") { | |
lock_guard<mutex> lock(key); | |
UnSubscribe.push_back(topic); | |
} else if(command == "pub") { | |
string msg; | |
cout << "Input word" << endl; | |
cin >> msg; | |
p.message = msg; | |
p.attributes["topic"] = topic; | |
send(pubSocket, m); | |
} | |
} | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Install-Package boost -Version 1.55.0.16 | |
// Install-Package boost_system-vc110 -Version 1.55.0.16 | |
#include <sstream> | |
#include <thread> | |
#include <map> | |
#include <mutex> | |
#include <boost/serialization/serialization.hpp> | |
#include <boost/serialization/map.hpp> | |
#include <boost/archive/text_oarchive.hpp> | |
#include <boost/archive/text_iarchive.hpp> | |
#include <zmq.hpp> | |
using namespace std; | |
struct Message { | |
string from; | |
string to; | |
string message; | |
map<string, string> attributes; | |
template<class Archive> | |
void serialize(Archive & ar, const unsigned int version) | |
{ | |
ar & from; | |
ar & to; | |
ar & message; | |
ar & attributes; | |
} | |
}; | |
void send(zmq::socket_t & socket, | |
Message message) { | |
stringstream stream; | |
boost::archive::text_oarchive serializer(stream); | |
serializer << message; | |
string serialized = ""; | |
if(message.attributes["topic"] != "") { | |
serialized = message.attributes["topic"] + "|"; | |
} | |
serialized += stream.str(); | |
zmq::message_t request (serialized.size()); | |
memcpy (request.data (), serialized.data(), serialized.size()); | |
socket.send (request); | |
} | |
bool receive(zmq::socket_t & socket, | |
Message & requestMessage) { | |
zmq::message_t zmqRequestMessage; | |
auto hasMsg = socket.recv (&zmqRequestMessage, ZMQ_DONTWAIT); | |
if(hasMsg) { | |
string data( static_cast<char*>(zmqRequestMessage.data()), zmqRequestMessage.size()); | |
stringstream inputStream(data); | |
boost::archive::text_iarchive deserializer(inputStream); | |
deserializer >> requestMessage; | |
} | |
return hasMsg; | |
} | |
vector<string> Subscribe; | |
mutex key; | |
void UpdateSubscriptions(zmq::socket_t & socket) { | |
lock_guard<mutex> lock (key); | |
for(auto & e : Subscribe) { | |
socket.setsockopt(ZMQ_SUBSCRIBE, e); | |
} | |
Subscribe.clear(); | |
} | |
int main () { | |
auto serverRep = "tcp://*:5555"; | |
string serverXPub = "tcp://*:5556" ; | |
string serverPublicXPub = "tcp://127.0.0.1:5556" ; | |
cout << "please input public XPUB adress [tcp://127.0.0.1:5556]" << endl; | |
cin >> serverPublicXPub; | |
zmq::context_t context (4); | |
auto pooler = [&](){ | |
zmq::socket_t publisher (context, ZMQ_XPUB); | |
publisher.bind(serverXPub); | |
zmq::socket_t subscriber (context, ZMQ_XSUB); | |
Message clientMessage; | |
while(true) { | |
auto res = receive(subscriber, clientMessage); | |
if(res) { | |
send(publisher, clientMessage); | |
} | |
this_thread::sleep_for(chrono::milliseconds(50)); | |
lock_guard<mutex> lock (key); | |
for(auto e : Subscribe) { | |
subscriber.connect(e); | |
} | |
Subscribe.clear(); | |
} | |
}; | |
thread t2(pooler); | |
auto responder = [&](){ | |
zmq::socket_t socket (context, ZMQ_REP); | |
socket.bind (serverRep); | |
Message requestMessage; | |
Message responseMessage; | |
responseMessage.from = "Server"; | |
responseMessage.attributes["ServerXPubUrl"] = serverPublicXPub; | |
while(true) { | |
auto hasMsg = receive(socket, requestMessage); | |
if(hasMsg) { | |
lock_guard<mutex> lock (key); | |
Subscribe.push_back(requestMessage.attributes["PublisherUrl"]); | |
send(socket, responseMessage); | |
} | |
this_thread::sleep_for(chrono::milliseconds(50)); | |
} | |
}; | |
thread t(responder); | |
cout << "server is up and running..." << endl; | |
while(true) { | |
this_thread::sleep_for(chrono::milliseconds(50)); | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment