Created
October 19, 2010 18:25
-
-
Save drbobbeaty/634738 to your computer and use it in GitHub Desktop.
Simple ZeroMQ Receiver for Quotes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// System Headers | |
#include <iostream> | |
#include <stdio.h> | |
#include <string> | |
#include <stdint.h> | |
#include <sys/time.h> | |
// Third-Party Headers | |
#include <zmq.hpp> | |
// Other Headers | |
// Forward Declarations | |
// Public Constants | |
// Public DataTypes | |
// Public Data Constants | |
/** | |
* This function simply returns the number of microseconds since epoch. | |
* We'll use it to time the messages and bytes per second we receive. | |
*/ | |
uint64_t usecSinceEpoch() { | |
struct timeval tv; | |
gettimeofday(&tv, NULL); | |
return (tv.tv_sec * 1000000 + tv.tv_usec); | |
} | |
/** | |
* This is the main test frame -- open up a ZMQ socket to the right | |
* URLs and listen to what's being transmitted. Simple. | |
*/ | |
int main(int argc, char *argv[]) { | |
bool error = false; | |
// make the ZMQ Context and Socket for what we need to "hear" | |
zmq::context_t *mContext = NULL; | |
if (!error) { | |
mContext = new zmq::context_t(1); | |
if (mContext == NULL) { | |
error = true; | |
std::cout << "could not create the ZMQ context" << std::endl; | |
} | |
} | |
zmq::socket_t *mSocket = NULL; | |
if (!error) { | |
mSocket = new zmq::socket_t(*mContext, ZMQ_SUB); | |
if (mSocket == NULL) { | |
error = true; | |
std::cout << "could not create the ZMQ socket" << std::endl; | |
} else { | |
// make sure we subscribe to all messages we get | |
mSocket->setsockopt(ZMQ_SUBSCRIBE, "", 0); | |
} | |
} | |
// verify it's all initialized OK | |
if (!error) { | |
std::cout << "Initialization complete." << std::endl; | |
} | |
// now let's connect to the URLs we need to listen to | |
if (!error) { | |
mSocket->connect("epgm://eth0;239.22.3.1:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.2:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.3:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.4:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.5:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.6:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.7:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.8:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.9:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.10:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.11:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.12:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.13:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.14:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.15:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.16:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.17:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.18:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.19:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.20:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.21:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.22:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.23:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.24:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.25:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.26:11111"); | |
mSocket->connect("epgm://eth0;239.22.3.27:11111"); | |
} | |
// now let's listen for a while and just record the stats | |
if (!error) { | |
uint64_t msgCnt = 0; | |
uint64_t byteCnt = 0; | |
uint64_t usec = usecSinceEpoch(); | |
while (true) { | |
// get a message from one of the connections | |
zmq::message_t msg; | |
mSocket->recv(&msg); | |
// count it and it's contents | |
++msgCnt; | |
byteCnt += msg.size(); | |
// see if we need to report this all | |
if (msgCnt % 20000 == 0) { | |
// get the snapshot time and log what we have | |
uint64_t now = usecSinceEpoch(); | |
printf("%ld kmsgs = %.1f kb in %.3f msec: %.1f kmsgs/sec... %.1f kb/sec\n", | |
msgCnt/1000, byteCnt/1024.0, ((now - usec)/1000.0), | |
(msgCnt/1000.0)/((now - usec) * 1.0e-6), | |
(byteCnt/1024.0)/((now - usec) * 1.0e-6)); | |
// now reset the counters | |
msgCnt = 0; | |
byteCnt = 0; | |
usec = now; | |
} | |
} | |
} | |
// clean it all up | |
if (mSocket != NULL) { | |
delete mSocket; | |
mSocket = NULL; | |
} | |
if (mContext != NULL) { | |
delete mContext; | |
mContext = NULL; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment