Last active
December 25, 2015 11:49
-
-
Save JosephLaurino/6971996 to your computer and use it in GitHub Desktop.
simple ZeroMQ sample, CLIENT sends char input from console to SERVER, while SERVER transmits any new char input to all subscribed CLIENT(s)
this sample uses c++11 for portable std::thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////// | |
// client_transmit_input.cpp FILE | |
//////////////////////////////////////////////////////////////////// | |
#include "client_transmit_input.h" | |
// ------------------------------------------------------------------- | |
void ClientRequest_SendCharHandler(zmq::context_t* context, std::string threadName) | |
{ | |
zmq::socket_t socket (*context, ZMQ_REQ); | |
std::cout << std::endl << "Connecting to relay char server..." ; | |
socket.connect ("tcp://localhost:5555"); | |
while(1) | |
{ | |
zmq::message_t request (2); | |
char input; | |
std::cin >> input; | |
memcpy ((void *) request.data (), &input, 2); | |
// std::cout << threadName << " Sending " << input << std::endl; | |
socket.send (request); | |
// Get the reply. | |
zmq::message_t reply; // empty reply | |
socket.recv (&reply); | |
} | |
} | |
// ------------------------------------------------------------------- | |
void ClientSubscription_InputHandler(zmq::context_t* pContext) | |
{ | |
// Socket to talk to server | |
std::cout << std::endl << "Subscribing to input updates from server"; | |
zmq::socket_t subscriber (*pContext, ZMQ_SUB); | |
subscriber.connect("tcp://localhost:5556"); | |
// ? -- what if we want to subscribe to everything??? | |
const char *filter = "0"; | |
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); | |
while( 1 ) | |
{ | |
zmq::message_t update; | |
subscriber.recv(&update); | |
std::string filter; | |
std::string input; | |
std::istringstream iss(static_cast<char*>(update.data())); | |
iss >> filter >> input; | |
std::cout << input; | |
} | |
} | |
// ------------------------------------------------------------------- | |
void test_client_transmit_input(zmq::context_t *pContext) | |
{ | |
std::string threadName("thread_"); | |
std::thread clientRequestSendCharThread(ClientRequest_SendCharHandler, pContext, threadName); | |
std::thread clientSubscriptionThread(ClientSubscription_InputHandler, pContext); | |
// also launch a thread for subscriber... join so that main does not exit | |
clientSubscriptionThread.join(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
///////////////////////////////////////////////////// | |
// client_transmit_input.h FILE | |
#ifndef CLIENT_TRANSMIT_INPUT_H | |
#define CLIENT_TRANSMIT_INPUT_H | |
#include <zmq.hpp> | |
#include <string> | |
#include <iostream> | |
#include <thread> | |
#include <iostream> | |
#include <sstream> | |
#include <vector> | |
void ClientRequest_SendCharHandler(zmq::context_t* context, std::string threadName); | |
void ClientSubscription_InputHandler(zmq::context_t* pContext); | |
void test_client_transmit_input(zmq::context_t *pContext); | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/////////////////////////////////// | |
// main.cpp FILE for client | |
#include "client_transmit_input.h" | |
// ------------------------------------------------------------------- | |
int main () | |
{ | |
zmq::context_t context(1); | |
test_client_transmit_input(&context); | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
////////////////////////////////////// | |
// main.cpp FILE for server | |
#include "server_transmit_input.h" | |
// ------------------------------------------------------------------- | |
int main () | |
{ | |
zmq::context_t context (1); | |
test_transmit_input(&context); | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
////////////////////////////////////////////// | |
// server_transmit_input.cpp FILE | |
////////////////////////////////////////////// | |
#include "server_transmit_input.h" | |
// ? --- learn how to do multi-threaded signaling using the | |
// zeromq way... no shared data is recommended | |
std::mutex g_hasNewInputDataMutex; | |
bool g_hasNewInputData = false; | |
char g_inputData; | |
// ------------------------------------------------------------------- | |
void ServerRespond_SendCharHandler(zmq::context_t* pContext) | |
{ | |
zmq::socket_t socket (*pContext, ZMQ_REP); | |
socket.bind ("tcp://*:5555"); | |
float counter = 0.0f; | |
std::cout << std::endl << "ServerRespond_SendCharHandler listening at " << "tcp://*:5555" ; | |
while (true) | |
{ | |
zmq::message_t request; | |
socket.recv (&request); | |
{ | |
std::lock_guard<std::mutex> lock(g_hasNewInputDataMutex); | |
g_hasNewInputData = true; | |
g_inputData = *((char*)(request.data())); | |
std::cout << g_inputData; | |
} | |
socket.send (request); | |
} | |
} | |
// ------------------------------------------------------------------- | |
void ServerPublisher_InputHandler(zmq::context_t* pContext) | |
{ | |
zmq::socket_t publisher (*pContext, ZMQ_PUB); | |
publisher.bind("tcp://*:5556"); | |
std::cout << std::endl<< "ServerPublisher_InputHandler listening at " << "tcp://*:5556"; | |
while (1) | |
{ | |
std::lock_guard<std::mutex> lock(g_hasNewInputDataMutex); | |
if( g_hasNewInputData == true ) | |
{ | |
// Send message to all subscribers | |
zmq::message_t message(10); | |
sprintf_s ((char *) message.data(), 10, "0 %c", g_inputData); | |
publisher.send(message); | |
g_hasNewInputData = false; | |
} | |
ThreadSleepInMilliseconds(1); | |
} | |
} | |
// ------------------------------------------------------------------- | |
void test_transmit_input(zmq::context_t* pContext) | |
{ | |
std::thread serverRespondThread(ServerRespond_SendCharHandler, pContext); | |
std::thread serverPublisherThread(ServerPublisher_InputHandler, pContext); | |
serverPublisherThread.join(); // wait for this thread to finish | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/////////////////////////////////////// | |
// server_transmit_input.h FILE | |
/////////////////////////////////////// | |
#ifndef SERVER_TRANSMIT_INPUT_H | |
#define SERVER_TRANSMIT_INPUT_H | |
#include <zmq.hpp> | |
#include <string> | |
#include <iostream> | |
#include <thread> | |
#include <chrono> | |
#include <math.h> | |
#include <mutex> | |
#include "shared_code.h" | |
void ServerRespond_SendCharHandler(zmq::context_t* pContext); | |
void ServerPublisher_InputHandler(zmq::context_t* pContext); | |
void test_transmit_input(zmq::context_t* pContext); | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment