Skip to content

Instantly share code, notes, and snippets.

@OlegJakushkin
Last active May 6, 2017 05:59
Show Gist options
  • Save OlegJakushkin/abf38a7936a7257ca7f382b66b77828a to your computer and use it in GitHub Desktop.
Save OlegJakushkin/abf38a7936a7257ca7f382b66b77828a to your computer and use it in GitHub Desktop.
ZMQ Chat Server and Client
// Install-Package boost -Version 1.55.0.16
// Install-Package boost_system-vc110 -Version 1.55.0.16
#include <zmq.hpp>
#include <sstream>
#include <iostream>
#include <thread>
#include <map>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/map.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
using namespace std;
struct Message {
string from;
string to;
string message;
map<string, string> attributes;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & from;
ar & to;
ar & message;
ar & attributes;
}
};
void send(zmq::socket_t & socket, Message message) {
stringstream stream;
boost::archive::text_oarchive serializer(stream);
serializer << message;
auto serialized = stream.str();
zmq::message_t request (serialized.size());
memcpy (request.data (), serialized.data(), serialized.size());
socket.send (request);
}
Message receive(zmq::socket_t & socket) {
Message requestMessage;
zmq::message_t zmqRequestMessage;
socket.recv (&zmqRequestMessage);
string data( static_cast<char*>(zmqRequestMessage.data()), zmqRequestMessage.size());
stringstream inputStream(data);
boost::archive::text_iarchive deserializer(inputStream);
deserializer >> requestMessage;
return requestMessage;
}
int main ()
{
auto serverName = "tcp://localhost:5555";
zmq::context_t context (1);
auto pooler = [&](string userName){
Message poolRequest;
poolRequest.from = userName;
poolRequest.to = "Server";
zmq::socket_t socket (context, ZMQ_REQ);
socket.connect (serverName);
while(true) {
this_thread::sleep_for(
chrono::milliseconds(100));
send(socket, poolRequest);
auto result = receive(socket);
if(result.from != "Server") {
cout << result.from << ":" << endl
<< result.message << endl;
}
}
};
zmq::socket_t socket (context, ZMQ_REQ);
cout << "Connecting to hello world server…" << endl;
socket.connect (serverName);
Message m;
m.attributes["bla-bla"] = "bla";
cout << "name: ";
cin >> m.from;
thread t(pooler, m.from);
while(true) {
cout << "[to] [word]";
cin >> m.to >> m.message;
send(socket, m);
receive(socket);
}
return 0;
}
#include <iostream>
#include <sstream>
#include <map>
#include <vector>
#include <algorithm>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <zmq.hpp>
#include <boost/archive/text_oarchive.hpp>
using namespace std;
struct Message {
string from;
string to;
string message;
map<string, string> attributes;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & from;
ar & to;
ar & message;
ar & attributes;
}
};
vector<Message> queue;
int main() {
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
try {
Message requestMessage;
zmq::message_t zmqRequestMessage;
socket.recv (&zmqRequestMessage);
string data( static_cast<char*>(zmqRequestMessage.data()), zmqRequestMessage.size());
stringstream inputStream(data);
boost::archive::text_iarchive deserializer(inputStream);
deserializer >> requestMessage;
Message responseMessage;
responseMessage.from = "Server";
responseMessage.to = requestMessage.from;
responseMessage.message="OK";
if(requestMessage.to == "Server") {
auto it = find_if(queue.begin(), queue.end(),
[&](Message & m){
return m.to == requestMessage.from;
});
if(it != queue.end()) {
responseMessage = *it;
queue.erase(it);
}
} else {
cout << "Received Hello "
<< requestMessage.from << " -> "
<< requestMessage.to << ": "<< requestMessage.message << endl
<< requestMessage.attributes["apmath"] << endl;
queue.push_back(requestMessage);
}
stringstream outputStream;
boost::archive::text_oarchive serializer(outputStream);
serializer << responseMessage;
auto serialized = outputStream.str();
zmq::message_t response (serialized.size());
memcpy (response.data (), serialized.data(), serialized.size());
socket.send (response);
} catch (exception &e) {
cout << e.what() << endl << endl;
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment