-
-
Save frankosterfeld/528fcbc85e1146ad6b36cbc142404304 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 463d4ac5dd00523906bd335765111013725de53a Mon Sep 17 00:00:00 2001 | |
From: Frank Osterfeld <[email protected]> | |
Date: Fri, 5 Nov 2021 14:25:01 +0100 | |
Subject: [PATCH] MdpMessage: Add template parameter for 8 vs. 9 frame | |
distinction | |
--- | |
.../include/majordomo/BasicMdpWorker.hpp | 19 +--- | |
src/majordomo/include/majordomo/Message.hpp | 78 +++++++------ | |
src/majordomo/include/majordomo/broker.hpp | 68 ++++++------ | |
src/majordomo/include/majordomo/client.hpp | 14 +-- | |
src/majordomo/test/majordomo_tests.cpp | 105 ++++++++++++------ | |
src/yaxxzmq/include/yaz/kill/Message.hpp | 4 - | |
6 files changed, 157 insertions(+), 131 deletions(-) | |
diff --git a/src/majordomo/include/majordomo/BasicMdpWorker.hpp b/src/majordomo/include/majordomo/BasicMdpWorker.hpp | |
index c51a3ae..3cd3a70 100644 | |
--- a/src/majordomo/include/majordomo/BasicMdpWorker.hpp | |
+++ b/src/majordomo/include/majordomo/BasicMdpWorker.hpp | |
@@ -19,13 +19,6 @@ class BasicMdpWorker { | |
std::atomic<bool> _shutdown_requested = false; | |
yaz::Socket<MdpMessage, BasicMdpWorker *> _socket; | |
- void send(MdpMessage &&message) { | |
- auto &frames = message.parts_ref(); | |
- assert(frames.size() == 9); | |
- auto span = std::span(frames); | |
- _socket.send_parts(span.subspan(1, span.size() - 1)); | |
- } | |
- | |
protected: | |
MdpMessage create_message(MdpMessage::WorkerCommand command) { | |
auto message = MdpMessage::createWorkerMessage(command); | |
@@ -58,10 +51,6 @@ public: | |
} | |
void handle_message(MdpMessage &&message) { | |
- // we receive 8 frames here, add first empty frame for MdpMessage | |
- auto &frames = message.parts_ref(); | |
- frames.emplace(frames.begin(), yaz::MessagePart{}); | |
- | |
if (!message.isValid()) { | |
debug() << "invalid MdpMessage received\n"; | |
return; | |
@@ -71,12 +60,12 @@ public: | |
switch (message.workerCommand()) { | |
case MdpMessage::WorkerCommand::Get: | |
if (auto reply = handle_get(std::move(message))) { | |
- send(std::move(*reply)); | |
+ _socket.send(std::move(*reply)); | |
} | |
return; | |
case MdpMessage::WorkerCommand::Set: | |
if (auto reply = handle_set(std::move(message))) { | |
- send(std::move(*reply)); | |
+ _socket.send(std::move(*reply)); | |
} | |
return; | |
case MdpMessage::WorkerCommand::Heartbeat: | |
@@ -100,14 +89,14 @@ public: | |
auto ready = create_message(MdpMessage::WorkerCommand::Ready); | |
ready.setBody(_service_description, yaz::MessagePart::dynamic_bytes_tag{}); | |
- send(std::move(ready)); | |
+ _socket.send(std::move(ready)); | |
return true; | |
} | |
bool disconnect() { | |
auto msg = create_message(MdpMessage::WorkerCommand::Disconnect); | |
- send(std::move(msg)); | |
+ _socket.send(std::move(msg)); | |
return _socket.disconnect(); | |
} | |
diff --git a/src/majordomo/include/majordomo/Message.hpp b/src/majordomo/include/majordomo/Message.hpp | |
index 6978eea..6e1cecc 100644 | |
--- a/src/majordomo/include/majordomo/Message.hpp | |
+++ b/src/majordomo/include/majordomo/Message.hpp | |
@@ -12,25 +12,35 @@ namespace Majordomo::OpenCMW { | |
using yaz::Bytes; | |
using yaz::MessagePart; | |
-class MdpMessage : public yaz::Message { | |
+enum class MessageFormat { | |
+ WithSourceId, ///< 9-frame format, contains the source ID as frame 0, used with ROUTER sockets (broker) | |
+ WithoutSourceId ///< 8-frame format, does not contain the source ID frame | |
+}; | |
+ | |
+template<MessageFormat Format> | |
+class BasicMdpMessage; | |
+ | |
+using MdpMessage = BasicMdpMessage<MessageFormat::WithoutSourceId>; | |
+ | |
+template<MessageFormat Format> | |
+class BasicMdpMessage : public yaz::Message { | |
private: | |
static constexpr auto clientProtocol = "MDPC03"; | |
static constexpr auto workerProtocol = "MDPW03"; | |
- static constexpr auto numFrames = 9; | |
- // std::vector<Bytes> _frames{ numFrames }; | |
+ static constexpr auto FrameCount = Format == MessageFormat::WithSourceId ? 9 : 8; | |
enum class Frame : std::size_t { | |
- SourceId = 0, | |
- Protocol = 1, | |
- Command = 2, | |
- ServiceName = 3, | |
- ClientSourceId = ServiceName, | |
- ClientRequestId = 4, | |
- Topic = 5, | |
- Body = 6, | |
- Error = 7, | |
- RBAC = 8 | |
+ SourceId = 0, | |
+ Protocol = Format == MessageFormat::WithSourceId ? 1 : 0, | |
+ Command, | |
+ ServiceName, | |
+ ClientSourceId = ServiceName, | |
+ ClientRequestId, | |
+ Topic, | |
+ Body, | |
+ Error, | |
+ RBAC | |
}; | |
template<typename T> | |
@@ -48,12 +58,12 @@ private: | |
return operator[](index(value)); | |
} | |
- MdpMessage() { | |
- resize(numFrames); | |
+ BasicMdpMessage() { | |
+ resize(FrameCount); | |
} | |
- explicit MdpMessage(char command) { | |
- resize(numFrames); | |
+ explicit BasicMdpMessage(char command) { | |
+ resize(FrameCount); | |
setCommand(command); | |
assert(this->command() == command); | |
} | |
@@ -62,8 +72,8 @@ private: | |
setFrameData(Frame::Command, new std::string(1, command), MessagePart::dynamic_bytes_tag{}); | |
} | |
- MdpMessage(const MdpMessage &) = default; | |
- MdpMessage &operator=(const MdpMessage &) = default; | |
+ BasicMdpMessage(const BasicMdpMessage &) = default; | |
+ BasicMdpMessage &operator=(const BasicMdpMessage &) = default; | |
[[nodiscard]] char command() const { | |
assert(frameAt(Frame::Command).data().length() == 1); | |
@@ -74,7 +84,7 @@ private: | |
template<typename Message> | |
static bool isMessageValid(const Message &ymsg) { | |
// TODO better error reporting | |
- if (ymsg.parts_count() != numFrames) { | |
+ if (ymsg.parts_count() != FrameCount) { | |
return false; | |
} | |
@@ -128,30 +138,31 @@ public: | |
Worker | |
}; | |
- explicit MdpMessage(std::vector<yaz::MessagePart> &&parts) | |
+ explicit BasicMdpMessage(std::vector<yaz::MessagePart> &&parts) | |
: yaz::Message(std::move(parts)) { | |
} | |
- ~MdpMessage() = default; | |
- MdpMessage(MdpMessage &&other) = default; | |
- MdpMessage &operator=(MdpMessage &&other) = default; | |
+ ~BasicMdpMessage() = default; | |
+ BasicMdpMessage(BasicMdpMessage &&other) = default; | |
+ BasicMdpMessage &operator=(BasicMdpMessage &&other) = default; | |
- static MdpMessage createClientMessage(ClientCommand cmd) { | |
- MdpMessage msg{ static_cast<char>(cmd) }; | |
+ static BasicMdpMessage createClientMessage(ClientCommand cmd) { | |
+ BasicMdpMessage msg{ static_cast<char>(cmd) }; | |
msg.setFrameData(Frame::Protocol, clientProtocol, MessagePart::static_bytes_tag{}); | |
return msg; | |
} | |
- static MdpMessage createWorkerMessage(WorkerCommand cmd) { | |
- MdpMessage msg{ static_cast<char>(cmd) }; | |
+ static BasicMdpMessage createWorkerMessage(WorkerCommand cmd) { | |
+ BasicMdpMessage msg{ static_cast<char>(cmd) }; | |
msg.setFrameData(Frame::Protocol, workerProtocol, MessagePart::static_bytes_tag{}); | |
return msg; | |
} | |
- MdpMessage clone() const { | |
+ BasicMdpMessage clone() const { | |
// TODO make this nicer... | |
- MdpMessage tmp; | |
- for (size_t i = 0; i < parts_count(); ++i) | |
+ BasicMdpMessage tmp; | |
+ assert(parts_count() == FrameCount); | |
+ for (size_t i = 0; i < FrameCount; ++i) | |
tmp.add_part(std::make_unique<std::string>((*this)[i].data())); | |
return tmp; | |
} | |
@@ -205,10 +216,12 @@ public: | |
template<typename T, typename Tag> | |
void setSourceId(T &&sourceId, Tag tag) { | |
+ static_assert(Format == MessageFormat::WithSourceId, "not available for WithoutSourceId format"); | |
setFrameData(Frame::SourceId, YAZ_FWD(sourceId), tag); | |
} | |
[[nodiscard]] std::string_view sourceId() const { | |
+ static_assert(Format == MessageFormat::WithSourceId, "not available for WithoutSourceId format"); | |
return frameAt(Frame::SourceId).data(); | |
} | |
@@ -276,7 +289,8 @@ public: | |
} | |
}; | |
-static_assert(std::is_nothrow_move_constructible<MdpMessage>::value, "MdpMessage should be noexcept MoveConstructible"); | |
+static_assert(std::is_nothrow_move_constructible<BasicMdpMessage<MessageFormat::WithSourceId>>::value, "MdpMessage should be noexcept MoveConstructible"); | |
+static_assert(std::is_nothrow_move_constructible<BasicMdpMessage<MessageFormat::WithoutSourceId>>::value, "MdpMessage should be noexcept MoveConstructible"); | |
} // namespace Majordomo::OpenCMW | |
#endif | |
diff --git a/src/majordomo/include/majordomo/broker.hpp b/src/majordomo/include/majordomo/broker.hpp | |
index 9f50dd5..7500402 100644 | |
--- a/src/majordomo/include/majordomo/broker.hpp | |
+++ b/src/majordomo/include/majordomo/broker.hpp | |
@@ -38,7 +38,7 @@ constexpr int HEARTBEAT_LIVENESS = 3; | |
constexpr int HEARTBEAT_INTERVAL = 1000; | |
constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(10); // TODO | |
-using Majordomo::OpenCMW::MdpMessage; | |
+using BrokerMessage = BasicMdpMessage<MessageFormat::WithSourceId>; | |
template<typename Message, typename Handler> | |
class BaseSocket : public yaz::Socket<Message, Handler> { | |
@@ -62,28 +62,28 @@ public: | |
}; | |
template<typename Handler> | |
-class RouterSocket : public BaseSocket<MdpMessage, Handler> { | |
+class RouterSocket : public BaseSocket<BrokerMessage, Handler> { | |
public: | |
explicit RouterSocket(yaz::Context &context, Handler &&handler) | |
- : BaseSocket<MdpMessage, Handler>(context, ZMQ_ROUTER, std::move(handler)) { | |
+ : BaseSocket<BrokerMessage, Handler>(context, ZMQ_ROUTER, std::move(handler)) { | |
this->bind(INTERNAL_ADDRESS_BROKER); | |
} | |
}; | |
template<typename Handler> | |
-class SubSocket : public BaseSocket<MdpMessage, Handler> { | |
+class SubSocket : public BaseSocket<BrokerMessage, Handler> { | |
public: | |
explicit SubSocket(yaz::Context &context, Handler &&handler) | |
- : BaseSocket<MdpMessage, Handler>(context, ZMQ_XSUB, std::move(handler)) { | |
+ : BaseSocket<BrokerMessage, Handler>(context, ZMQ_XSUB, std::move(handler)) { | |
this->bind(INTERNAL_ADDRESS_SUBSCRIBE); | |
} | |
}; | |
template<typename Handler> | |
-class DnsSocket : public BaseSocket<MdpMessage, Handler> { | |
+class DnsSocket : public BaseSocket<BrokerMessage, Handler> { | |
public: | |
explicit DnsSocket(yaz::Context &context, Handler &&handler) | |
- : BaseSocket<MdpMessage, Handler>(context, ZMQ_DEALER, std::move(handler)) { | |
+ : BaseSocket<BrokerMessage, Handler>(context, ZMQ_DEALER, std::move(handler)) { | |
} | |
}; | |
@@ -107,12 +107,12 @@ private: | |
using Timestamp = std::chrono::time_point<std::chrono::steady_clock>; | |
using SocketGroup = yaz::SocketGroup<Broker *, RouterSocket, PubSocket, SubSocket, DnsSocket>; | |
- using SocketType = yaz::Socket<MdpMessage, Broker::SocketGroup *>; | |
+ using SocketType = yaz::Socket<BrokerMessage, Broker::SocketGroup *>; | |
struct Client { | |
SocketType *socket; | |
const std::string id; | |
- std::deque<MdpMessage> requests; | |
+ std::deque<BrokerMessage> requests; | |
explicit Client(SocketType *s, std::string id_) | |
: socket(s) | |
@@ -148,19 +148,19 @@ private: | |
std::string name; | |
std::string description; | |
std::deque<Worker *> waiting; | |
- std::deque<MdpMessage> requests; | |
+ std::deque<BrokerMessage> requests; | |
explicit Service(std::string name_, std::string description_) | |
: name(std::move(name_)) | |
, description(std::move(description_)) { | |
} | |
- void put_message(MdpMessage &&message) { | |
+ void put_message(BrokerMessage &&message) { | |
// TODO prioritise by RBAC role | |
requests.emplace_back(std::move(message)); | |
} | |
- MdpMessage take_next_message() { | |
+ BrokerMessage take_next_message() { | |
assert(!requests.empty()); | |
auto msg = std::move(requests.front()); | |
requests.pop_front(); | |
@@ -248,13 +248,13 @@ private: | |
assert(worker); | |
message.setClientSourceId(message.sourceId(), MessagePart::dynamic_bytes_tag{}); | |
message.setSourceId(worker->id, MessagePart::dynamic_bytes_tag{}); | |
- message.setProtocol(MdpMessage::Protocol::Worker); | |
+ message.setProtocol(BrokerMessage::Protocol::Worker); | |
// TODO assert that command exists in both protocols? | |
worker->socket->send(std::move(message)); | |
} | |
} | |
- void send_with_source_id(MdpMessage &&message, std::string_view source_id) { | |
+ void send_with_source_id(BrokerMessage &&message, std::string_view source_id) { | |
message.setSourceId(source_id, MessagePart::dynamic_bytes_tag{}); | |
_sockets.get<RouterSocket>().send(std::move(message)); | |
} | |
@@ -264,7 +264,7 @@ private: | |
return subscription_topic.starts_with(topic); | |
} | |
- void dispatch_message_to_matching_subscribers(MdpMessage &&message) { | |
+ void dispatch_message_to_matching_subscribers(BrokerMessage &&message) { | |
const auto it = _subscribed_clients_by_topic.find(std::string(message.topic())); | |
const auto has_router_subscriptions = it != _subscribed_clients_by_topic.end(); | |
@@ -325,7 +325,7 @@ private: | |
// not implemented -- reply according to Majordomo Management Interface (MMI) as defined in http://rfc.zeromq.org/spec:8 | |
- auto reply = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final); | |
+ auto reply = BrokerMessage::createClientMessage(BrokerMessage::ClientCommand::Final); | |
constexpr auto tag = yaz::MessagePart::dynamic_bytes_tag{}; | |
reply.setSourceId(client_message.sourceId(), tag); | |
reply.setClientSourceId(client_message.clientSourceId(), tag); | |
@@ -362,7 +362,7 @@ private: | |
} | |
template<typename Socket> | |
- void process_worker(Socket &socket, MdpMessage &&message) { | |
+ void process_worker(Socket &socket, BrokerMessage &&message) { | |
assert(message.isWorkerMessage()); | |
const auto service_name = std::string(message.serviceName()); | |
@@ -372,13 +372,13 @@ private: | |
worker.update_expiry(); | |
switch (message.workerCommand()) { | |
- case MdpMessage::WorkerCommand::Ready: { | |
+ case BrokerMessage::WorkerCommand::Ready: { | |
debug() << "log new local/external worker for service " << service_name << " - " << message << std::endl; | |
std::ignore = require_service(service_name, std::string(message.body())); | |
worker_waiting(worker); | |
// notify potential listeners | |
- auto notify = MdpMessage::createWorkerMessage(MdpMessage::WorkerCommand::Notify); | |
+ auto notify = BrokerMessage::createWorkerMessage(BrokerMessage::WorkerCommand::Notify); | |
const auto dynamic_tag = yaz::MessagePart::dynamic_bytes_tag{}; | |
notify.setServiceName(INTERNAL_SERVICE_NAMES, dynamic_tag); | |
notify.setTopic(INTERNAL_SERVICE_NAMES, dynamic_tag); | |
@@ -387,11 +387,11 @@ private: | |
pub_socket().send(std::move(notify)); | |
break; | |
} | |
- case MdpMessage::WorkerCommand::Disconnect: | |
+ case BrokerMessage::WorkerCommand::Disconnect: | |
// delete_worker(worker); // TODO handle? also commented out in java impl | |
break; | |
- case MdpMessage::WorkerCommand::Partial: | |
- case MdpMessage::WorkerCommand::Final: { | |
+ case BrokerMessage::WorkerCommand::Partial: | |
+ case BrokerMessage::WorkerCommand::Final: { | |
if (known_worker) { | |
const auto client_id = message.clientSourceId(); | |
auto client = _clients.find(std::string(client_id)); | |
@@ -403,17 +403,17 @@ private: | |
message.setServiceName(worker.service_name, yaz::MessagePart::dynamic_bytes_tag{}); | |
const auto client_command = [](auto worker_cmd) { | |
switch (worker_cmd) { | |
- case MdpMessage::WorkerCommand::Partial: | |
- return MdpMessage::ClientCommand::Partial; | |
- case MdpMessage::WorkerCommand::Final: | |
- return MdpMessage::ClientCommand::Final; | |
+ case BrokerMessage::WorkerCommand::Partial: | |
+ return BrokerMessage::ClientCommand::Partial; | |
+ case BrokerMessage::WorkerCommand::Final: | |
+ return BrokerMessage::ClientCommand::Final; | |
default: | |
assert(!"unexpected command"); | |
- return MdpMessage::ClientCommand::Final; | |
+ return BrokerMessage::ClientCommand::Final; | |
} | |
}(message.workerCommand()); | |
- message.setProtocol(MdpMessage::Protocol::Client); | |
+ message.setProtocol(BrokerMessage::Protocol::Client); | |
message.setClientCommand(client_command); | |
client->second.socket->send(std::move(message)); | |
worker_waiting(worker); | |
@@ -422,9 +422,9 @@ private: | |
} | |
break; | |
} | |
- case MdpMessage::WorkerCommand::Notify: { | |
- message.setProtocol(MdpMessage::Protocol::Client); | |
- message.setClientCommand(MdpMessage::ClientCommand::Final); | |
+ case BrokerMessage::WorkerCommand::Notify: { | |
+ message.setProtocol(BrokerMessage::Protocol::Client); | |
+ message.setClientCommand(BrokerMessage::ClientCommand::Final); | |
message.setSourceId(message.serviceName(), yaz::MessagePart::dynamic_bytes_tag{}); | |
message.setServiceName(worker.service_name, yaz::MessagePart::dynamic_bytes_tag{}); | |
@@ -447,7 +447,7 @@ private: | |
} | |
void disconnect_worker(Worker &worker) { | |
- auto disconnect = MdpMessage::createWorkerMessage(MdpMessage::WorkerCommand::Disconnect); | |
+ auto disconnect = BrokerMessage::createWorkerMessage(BrokerMessage::WorkerCommand::Disconnect); | |
constexpr auto dynamic_tag = yaz::MessagePart::dynamic_bytes_tag{}; | |
disconnect.setSourceId(worker.id, dynamic_tag); | |
disconnect.setServiceName(worker.service_name, dynamic_tag); | |
@@ -575,7 +575,7 @@ public: | |
if (message.isClientMessage()) { | |
switch (message.clientCommand()) { | |
// TODO handle READY (client)? | |
- case MdpMessage::ClientCommand::Subscribe: { | |
+ case BrokerMessage::ClientCommand::Subscribe: { | |
auto it = _subscribed_clients_by_topic.try_emplace(std::string(message.topic()), std::set<std::string>{}); | |
// TODO check for duplicate subscriptions? | |
it.first->second.emplace(message.sourceId()); | |
@@ -585,7 +585,7 @@ public: | |
} | |
return true; | |
} | |
- case MdpMessage::ClientCommand::Unsubscribe: { | |
+ case BrokerMessage::ClientCommand::Unsubscribe: { | |
auto it = _subscribed_clients_by_topic.find(std::string(message.topic())); | |
if (it != _subscribed_clients_by_topic.end()) { | |
it->second.erase(std::string(message.sourceId())); | |
diff --git a/src/majordomo/include/majordomo/client.hpp b/src/majordomo/include/majordomo/client.hpp | |
index a14fd5a..3592685 100644 | |
--- a/src/majordomo/include/majordomo/client.hpp | |
+++ b/src/majordomo/include/majordomo/client.hpp | |
@@ -43,7 +43,7 @@ public: | |
Request get(std::string_view service_name, BodyType request) { | |
auto [handle, message] = create_request_template(MdpMessage::ClientCommand::Get, service_name); | |
message.setBody(YAZ_FWD(request), MessagePart::dynamic_bytes_tag{}); | |
- send(std::move(message)); | |
+ _socket.send(std::move(message)); | |
return handle; | |
} | |
@@ -58,7 +58,7 @@ public: | |
Request set(std::string_view service_name, BodyType request) { | |
auto [handle, message] = create_request_template(MdpMessage::ClientCommand::Set, service_name); | |
message.setBody(YAZ_FWD(request), MessagePart::dynamic_bytes_tag{}); | |
- send(std::move(message)); | |
+ _socket.send(std::move(message)); | |
return handle; | |
} | |
@@ -70,10 +70,6 @@ public: | |
} | |
void handle_message(MdpMessage &&message) { | |
- // we receive 8 frames here, add first empty frame for MdpMessage | |
- auto &frames = message.parts_ref(); | |
- frames.emplace(frames.begin(), yaz::MessagePart{}); | |
- | |
if (!message.isValid()) { | |
debug() << "Received invalid message" << message << std::endl; | |
return; | |
@@ -114,12 +110,6 @@ private: | |
Request make_request_handle() { | |
return Request{ _next_request_id++ }; | |
} | |
- | |
- void send(MdpMessage &&message) { | |
- auto &frames = message.parts_ref(); | |
- auto span = std::span(frames); | |
- _socket.send_parts(span.subspan(1, span.size() - 1)); | |
- } | |
}; | |
} // namespace Majordomo::OpenCMW | |
diff --git a/src/majordomo/test/majordomo_tests.cpp b/src/majordomo/test/majordomo_tests.cpp | |
index 4011442..c453245 100644 | |
--- a/src/majordomo/test/majordomo_tests.cpp | |
+++ b/src/majordomo/test/majordomo_tests.cpp | |
@@ -13,8 +13,6 @@ | |
#include <deque> | |
#include <thread> | |
-using Majordomo::OpenCMW::MdpMessage; | |
- | |
class TestNode { | |
std::deque<yaz::Message> _receivedMessages; | |
@@ -50,39 +48,75 @@ public: | |
}; | |
TEST_CASE("OpenCMW::Message basics", "[Majordomo]") { | |
- using Majordomo::OpenCMW::MdpMessage; | |
+ { | |
+ using MdpMessage = Majordomo::OpenCMW::BasicMdpMessage<Majordomo::OpenCMW::MessageFormat::WithSourceId>; | |
+ auto msg = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final); | |
+ REQUIRE(msg.isClientMessage()); | |
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final); | |
+ | |
+ auto tag = yaz::MessagePart::static_bytes_tag{}; | |
+ msg.setTopic("I'm a topic", tag); | |
+ msg.setServiceName("service://abc", tag); | |
+ msg.setClientRequestId("request 1", tag); | |
+ msg.setBody("test body test body test body test body test body test body test body", tag); | |
+ msg.setError("fail!", tag); | |
+ msg.setRbac("password", tag); | |
+ | |
+ REQUIRE(msg.isClientMessage()); | |
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final); | |
+ REQUIRE(msg.topic() == "I'm a topic"); | |
+ REQUIRE(msg.serviceName() == "service://abc"); | |
+ REQUIRE(msg.clientRequestId() == "request 1"); | |
+ REQUIRE(msg.body() == "test body test body test body test body test body test body test body"); | |
+ REQUIRE(msg.error() == "fail!"); | |
+ REQUIRE(msg.rbac() == "password"); | |
+ | |
+ REQUIRE(msg.parts_count() == 9); | |
+ REQUIRE(msg[0].data() == ""); | |
+ REQUIRE(msg[1].data() == "MDPC03"); | |
+ REQUIRE(msg[2].data() == "\x6"); | |
+ REQUIRE(msg[3].data() == "service://abc"); | |
+ REQUIRE(msg[4].data() == "request 1"); | |
+ REQUIRE(msg[5].data() == "I'm a topic"); | |
+ REQUIRE(msg[6].data() == "test body test body test body test body test body test body test body"); | |
+ REQUIRE(msg[7].data() == "fail!"); | |
+ REQUIRE(msg[8].data() == "password"); | |
+ } | |
+ | |
+ { | |
+ using MdpMessage = Majordomo::OpenCMW::BasicMdpMessage<Majordomo::OpenCMW::MessageFormat::WithoutSourceId>; | |
+ auto msg = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final); | |
+ REQUIRE(msg.isClientMessage()); | |
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final); | |
+ | |
+ auto tag = yaz::MessagePart::static_bytes_tag{}; | |
+ msg.setTopic("I'm a topic", tag); | |
+ msg.setServiceName("service://abc", tag); | |
+ msg.setClientRequestId("request 1", tag); | |
+ msg.setBody("test body test body test body test body test body test body test body", tag); | |
+ msg.setError("fail!", tag); | |
+ msg.setRbac("password", tag); | |
+ | |
+ REQUIRE(msg.isClientMessage()); | |
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final); | |
+ REQUIRE(msg.topic() == "I'm a topic"); | |
+ REQUIRE(msg.serviceName() == "service://abc"); | |
+ REQUIRE(msg.clientRequestId() == "request 1"); | |
+ REQUIRE(msg.body() == "test body test body test body test body test body test body test body"); | |
+ REQUIRE(msg.error() == "fail!"); | |
+ REQUIRE(msg.rbac() == "password"); | |
+ | |
+ REQUIRE(msg.parts_count() == 8); | |
+ REQUIRE(msg[0].data() == "MDPC03"); | |
+ REQUIRE(msg[1].data() == "\x6"); | |
+ REQUIRE(msg[2].data() == "service://abc"); | |
+ REQUIRE(msg[3].data() == "request 1"); | |
+ REQUIRE(msg[4].data() == "I'm a topic"); | |
+ REQUIRE(msg[5].data() == "test body test body test body test body test body test body test body"); | |
+ REQUIRE(msg[6].data() == "fail!"); | |
+ REQUIRE(msg[7].data() == "password"); | |
+ } | |
- auto msg = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final); | |
- REQUIRE(msg.isClientMessage()); | |
- REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final); | |
- | |
- auto tag = yaz::MessagePart::static_bytes_tag{}; | |
- msg.setTopic("I'm a topic", tag); | |
- msg.setServiceName("service://abc", tag); | |
- msg.setClientRequestId("request 1", tag); | |
- msg.setBody("test body test body test body test body test body test body test body", tag); | |
- msg.setError("fail!", tag); | |
- msg.setRbac("password", tag); | |
- | |
- REQUIRE(msg.isClientMessage()); | |
- REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final); | |
- REQUIRE(msg.topic() == "I'm a topic"); | |
- REQUIRE(msg.serviceName() == "service://abc"); | |
- REQUIRE(msg.clientRequestId() == "request 1"); | |
- REQUIRE(msg.body() == "test body test body test body test body test body test body test body"); | |
- REQUIRE(msg.error() == "fail!"); | |
- REQUIRE(msg.rbac() == "password"); | |
- | |
- REQUIRE(msg.parts_count() == 9); | |
- REQUIRE(msg[0].data().empty()); | |
- REQUIRE(msg[1].data() == "MDPC03"); | |
- REQUIRE(msg[2].data() == "\x6"); | |
- REQUIRE(msg[3].data() == "service://abc"); | |
- REQUIRE(msg[4].data() == "request 1"); | |
- REQUIRE(msg[5].data() == "I'm a topic"); | |
- REQUIRE(msg[6].data() == "test body test body test body test body test body test body test body"); | |
- REQUIRE(msg[7].data() == "fail!"); | |
- REQUIRE(msg[8].data() == "password"); | |
} | |
TEST_CASE("Request answered with unknown service", "[Broker]") { | |
@@ -443,9 +477,12 @@ TEST_CASE("pubsub example using router socket", "[Broker]") { | |
} | |
} | |
+using Majordomo::OpenCMW::MdpMessage; | |
+ | |
class TestIntWorker : public Majordomo::OpenCMW::BasicMdpWorker { | |
int _x = 10; | |
public: | |
+ | |
explicit TestIntWorker(yaz::Context &context, std::string service_name, int initial_value) | |
: Majordomo::OpenCMW::BasicMdpWorker(context, service_name) | |
, _x(initial_value) { | |
diff --git a/src/yaxxzmq/include/yaz/kill/Message.hpp b/src/yaxxzmq/include/yaz/kill/Message.hpp | |
index 19c8a33..4fea62c 100644 | |
--- a/src/yaxxzmq/include/yaz/kill/Message.hpp | |
+++ b/src/yaxxzmq/include/yaz/kill/Message.hpp | |
@@ -203,10 +203,6 @@ public: | |
void resize(std::size_t size) { | |
_parts.resize(size); | |
} | |
- | |
- std::vector<MessagePart> &parts_ref() { | |
- return _parts; | |
- } | |
}; | |
} // namespace yaz | |
-- | |
2.32.0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment