Created
April 1, 2016 14:28
-
-
Save mbroadst/d7bfefb0589a3296ecca55f36af6b3f9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <qpid/messaging/Connection.h> | |
#include <qpid/messaging/Message.h> | |
#include <qpid/messaging/Receiver.h> | |
#include <qpid/messaging/Session.h> | |
#include <jsoncpp/json/reader.h> | |
#include <vector> | |
#include <string> | |
#include <iostream> | |
#include <sstream> | |
using namespace qpid::messaging; | |
using namespace qpid::types; | |
std::vector<std::string> &split(const std::string &input, char delimiter, | |
std::vector<std::string> &result) | |
{ | |
std::stringstream stream(input); | |
std::string item; | |
while (std::getline(stream, item, delimiter)) | |
result.push_back(item); | |
return result; | |
} | |
std::vector<std::string> split(const std::string &input, char delimiter) | |
{ | |
std::vector<std::string> result; | |
split(input, delimiter, result); | |
return result; | |
} | |
Json::Value convertVariantMapToJson(const Variant::Map &map); | |
Json::Value convertVariantListToJson(const Variant::List &list); | |
Json::Value convertVariantToJson(const Variant &variant) | |
{ | |
switch (variant.getType()) { | |
case VAR_VOID: | |
return Json::Value(); | |
case VAR_BOOL: | |
return variant.asBool(); | |
case VAR_UINT8: | |
case VAR_UINT16: | |
case VAR_UINT32: | |
case VAR_INT8: | |
case VAR_INT16: | |
case VAR_INT32: | |
return variant.asInt32(); | |
case VAR_UINT64: | |
case VAR_INT64: | |
case VAR_UUID: | |
case VAR_STRING: | |
return variant.asString(); | |
case VAR_FLOAT: | |
case VAR_DOUBLE: | |
return variant.asDouble(); | |
case VAR_MAP: | |
return convertVariantMapToJson(variant.asMap()); | |
case VAR_LIST: | |
return convertVariantListToJson(variant.asList()); | |
} | |
return Json::Value(); | |
} | |
Json::Value convertVariantListToJson(const Variant::List &list) | |
{ | |
Json::Value result; | |
Variant::List::const_iterator it; | |
for (it = list.cbegin(); it != list.cend(); ++it) { | |
result.append(convertVariantToJson((*it))); | |
} | |
return result; | |
} | |
Json::Value convertVariantMapToJson(const Variant::Map &map) | |
{ | |
Json::Value result; | |
Variant::Map::const_iterator it; | |
for (it = map.cbegin(); it != map.cend(); ++it) { | |
std::string key = (*it).first; | |
Variant variant = (*it).second; | |
result[key] = convertVariantToJson(variant); | |
} | |
return result; | |
} | |
Json::Value extractJsonBody(const Message &message) | |
{ | |
Json::Value result; | |
Variant content = message.getContentObject(); | |
if (content.getType() == qpid::types::VAR_STRING) { | |
Json::Reader reader; | |
std::stringstream bs(message.getContent()); | |
if (!reader.parse(bs, result, false)) { | |
std::cerr << "parse error: " << reader.getFormatedErrorMessages(); | |
return Json::Value(message.getContent()); | |
} | |
} else { | |
result = convertVariantToJson(content); | |
} | |
return result; | |
} | |
int main(int argc, char** argv) | |
{ | |
if (argc < 3) { | |
std::cout << "usage: " << argv[0] << " <broker> <topics>" << std::endl; | |
return 0; | |
} | |
std::string broker = argv[1]; | |
std::vector<std::string> topics = split(argv[2], ','); | |
try { | |
Connection connection(broker, "{ protocol: amqp1.0 }"); | |
connection.open(); | |
Session session = connection.createSession(); | |
std::vector<Receiver> receivers; | |
std::vector<std::string>::const_iterator it; | |
for (it = topics.begin(); it != topics.end(); ++it) { | |
Receiver receiver = session.createReceiver((*it)); | |
receiver.setCapacity(10); | |
receivers.push_back(receiver); | |
} | |
while (true) { | |
Message message = session.nextReceiver().fetch(); | |
Json::Value test = extractJsonBody(message); | |
std::cout << test.toStyledString() << std::endl; | |
session.acknowledge(); | |
} | |
connection.close(); | |
return 0; | |
} catch(const std::exception& error) { | |
std::cerr << error.what() << std::endl; | |
return 1; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <qpid/messaging/Connection.h> | |
#include <qpid/messaging/Message.h> | |
#include <qpid/messaging/Sender.h> | |
#include <qpid/messaging/Session.h> | |
#include <jsoncpp/json/reader.h> | |
#include <iostream> | |
#include <iterator> | |
#include <sstream> | |
using namespace qpid::messaging; | |
using namespace qpid::types; | |
Variant::List convertJsonArrayToVariant(const Json::Value &array); | |
Variant::Map convertJsonObjectToVariant(const Json::Value &object); | |
Variant convertJsonToVariant(const Json::Value &value) { | |
switch (value.type()) { | |
case Json::nullValue: return Variant(); | |
case Json::intValue: return Variant(value.asInt()); | |
case Json::uintValue: return Variant(value.asUInt()); | |
case Json::realValue: return Variant(value.asDouble()); | |
case Json::booleanValue: return Variant(value.asBool()); | |
case Json::stringValue: { | |
Variant stringVariant(value.asString()); | |
stringVariant.setEncoding("utf8"); | |
return stringVariant; | |
} | |
case Json::arrayValue: | |
return convertJsonArrayToVariant(value); | |
case Json::objectValue: | |
return convertJsonObjectToVariant(value); | |
} | |
return Variant(); | |
} | |
Variant::List convertJsonArrayToVariant(const Json::Value &array) | |
{ | |
Variant::List result; | |
Json::Value::const_iterator it; | |
for (it = array.begin(); it != array.end(); ++it) { | |
result.push_back(convertJsonToVariant((*it))); | |
} | |
return result; | |
} | |
Variant::Map convertJsonObjectToVariant(const Json::Value &object) | |
{ | |
Variant::Map result; | |
Json::Value::Members members = object.getMemberNames(); | |
Json::Value::Members::const_iterator it; | |
for (it = members.cbegin(); it != members.cend(); ++it) { | |
std::string key = (*it); | |
Variant value = convertJsonToVariant(object[(*it)]); | |
result.insert( std::pair<std::string, Variant>(key, value) ); | |
} | |
return result; | |
} | |
Message createMessage(const std::string &input) { | |
Json::Value data; | |
Json::Reader reader; | |
std::stringstream bs(input); | |
if (!reader.parse(bs, data, false)) { | |
std::cerr << "parse error: " << reader.getFormatedErrorMessages(); | |
return Message(input); | |
} | |
Message result; | |
result.setContentObject(convertJsonToVariant(data)); | |
result.getContentObject().setEncoding("utf8"); | |
return result; | |
} | |
int main(int argc, char** argv) | |
{ | |
if (argc < 3) { | |
std::cout << "usage: " << argv[0] << " <broker> <address>" << std::endl; | |
return 0; | |
} | |
std::string broker = argv[1]; | |
std::string address = argv[2]; | |
std::cin >> std::noskipws; | |
std::istream_iterator<char> it(std::cin); | |
std::istream_iterator<char> end; | |
std::string data(it, end); | |
try { | |
Connection connection(broker, "{ protocol: amqp1.0 }"); | |
connection.open(); | |
Session session = connection.createSession(); | |
Sender sender = session.createSender(address); | |
Message message = createMessage(data); | |
sender.send(message); | |
connection.close(); | |
return 0; | |
} catch(const std::exception& error) { | |
std::cerr << error.what() << std::endl; | |
return 1; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment