Created
November 16, 2021 23:33
-
-
Save icedraco/965e2f94ae28cc8cadfe0fef586ebacd to your computer and use it in GitHub Desktop.
POSIX Message Queues Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <string> | |
#include <sstream> | |
#include <unistd.h> | |
#include "mq.h" | |
#define MQ_NAME "/ice.test" | |
void printSyntax(const char* exe) | |
{ | |
std::cerr << "Syntax: " << exe << " {server|client}" << std::endl; | |
} | |
int doServer() | |
{ | |
MqServer server(MQ_NAME); | |
if (!server.isOpen()) | |
{ | |
std::cerr << "SERVER COULD NOT OPEN" << std::endl; | |
return 2; | |
} | |
for (int i = 0; i < 15; ++i) | |
{ | |
std::stringstream ss; | |
ss << "Message #" << i; | |
server.send(ss.str()); | |
sleep(3); | |
} | |
std::cout << "OK" << std::endl; | |
return 0; | |
} | |
int doClient() | |
{ | |
const uint16_t bufferLen = MqServer::kDefaultMessageSize; | |
char buffer[bufferLen]; | |
MqClient client(MQ_NAME); | |
if (!client.isOpen()) | |
{ | |
std::cerr << "CLIENT FAILED TO OPEN!" << std::endl; | |
return 2; | |
} | |
while (true) | |
{ | |
uint32_t msgPrio = -1; | |
ssize_t size = client.recv(buffer, bufferLen, &msgPrio); | |
} | |
return 0; | |
} | |
int main(int argc, char** argv) | |
{ | |
char mode = 'x'; | |
if (argc >= 2) | |
{ | |
mode = argv[1][0]; | |
} | |
switch (mode) | |
{ | |
case 's': | |
return doServer(); | |
case 'c': | |
return doClient(); | |
default: | |
printSyntax(argv[0]); | |
return 1; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "mq.h" | |
#include <cstdio> | |
//============================================================================= | |
// SERVER | |
//============================================================================= | |
MqServer::MqServer( | |
const std::string& mqName, | |
int permissions, | |
uint16_t maxMessages, | |
uint16_t maxMessageSize | |
) : MqEntity(mqName) | |
{ | |
struct mq_attr attr; | |
attr.mq_flags = 0; // ignored for mq_open() | |
attr.mq_maxmsg = maxMessages; | |
attr.mq_msgsize = maxMessageSize; | |
attr.mq_curmsgs = 0; // ignore for mq_open() | |
myQueueDescriptor = mq_open(mqName.c_str(), O_WRONLY|O_CREAT, permissions, &attr); | |
if (myQueueDescriptor == -1) | |
{ | |
perror("mq_open()"); | |
} | |
else | |
{ | |
std::cout << "[OPEN] " << getName() | |
<< " (maxMsg=" << maxMessages << ", maxMsgSize=" << maxMessageSize << ")" | |
<< std::endl; | |
myIsOpen = true; | |
} | |
} | |
MqServer::~MqServer() | |
{ | |
if (isOpen()) | |
{ | |
int ret = mq_close(myQueueDescriptor); | |
if (ret == -1) | |
{ | |
perror("mq_close()"); | |
} | |
ret = mq_unlink(getName().c_str()); | |
if (ret == -1) | |
{ | |
perror("mq_unlink()"); | |
} | |
std::cout << "[DOWN] " << getName() << std::endl; | |
} | |
} | |
int MqServer::send(const std::string& msg, uint32_t msgPriority) | |
{ | |
int ret = mq_send(myQueueDescriptor, msg.c_str(), msg.size(), msgPriority); | |
if (ret == -1) | |
{ | |
perror("mq_send()"); | |
} | |
else | |
{ | |
std::cout << "[" << getName() << "] <-- " << msg << std::endl; | |
} | |
return ret; | |
} | |
//============================================================================= | |
// CLIENT | |
//============================================================================= | |
MqClient::MqClient(const std::string& mqName) : MqEntity(mqName) | |
{ | |
myQueueDescriptor = mq_open(mqName.c_str(), O_RDONLY); | |
if (myQueueDescriptor == -1) | |
{ | |
perror("mq_open()"); | |
} | |
else | |
{ | |
std::cout << "[OPEN] " << getName() << " (client)" << std::endl; | |
myIsOpen = true; | |
} | |
} | |
MqClient::~MqClient() | |
{ | |
if (isOpen()) | |
{ | |
int ret = mq_close(myQueueDescriptor); | |
if (ret == -1) | |
{ | |
perror("mq_close()"); | |
} | |
std::cout << "[DOWN] " << getName() << std::endl; | |
} | |
} | |
ssize_t MqClient::recv(char* msgPtr, size_t msgLen, uint32_t* outMsgPrio) | |
{ | |
ssize_t bytesRead = mq_receive(myQueueDescriptor, msgPtr, msgLen, outMsgPrio); | |
if (bytesRead == -1) | |
{ | |
perror("mq_receive()"); | |
} | |
else | |
{ | |
msgPtr[bytesRead] = '\0'; | |
std::cout << "[" << getName() << "] --> " << msgPtr << std::endl; | |
} | |
return bytesRead; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <iostream> | |
#include <string> | |
#include <fcntl.h> | |
#include <sys/stat.h> | |
#include <mqueue.h> | |
class MqEntity | |
{ | |
public: | |
MqEntity(const std::string& mqName) : myIsOpen(false), myQueueName(mqName) {} | |
~MqEntity() {} | |
inline const std::string& getName() const { return myQueueName; } | |
inline bool isOpen() const { return myIsOpen; } | |
protected: | |
mqd_t myQueueDescriptor; | |
bool myIsOpen; | |
private: | |
std::string myQueueName; | |
}; | |
class MqServer: public MqEntity | |
{ | |
public: | |
static const int kDefaultPermissions = 0600; | |
static const uint16_t kDefaultMaxMessages = 5; | |
static const uint16_t kDefaultMessageSize = 256; | |
static const uint32_t kDefaultMessagePriority = 3; | |
MqServer( | |
const std::string& mqName, | |
int permissions = kDefaultPermissions, | |
uint16_t maxMessages = kDefaultMaxMessages, | |
uint16_t maxMessageSize = kDefaultMessageSize); | |
~MqServer(); | |
int send(const std::string& msg, uint32_t msgPriority=kDefaultMessagePriority); | |
}; | |
class MqClient: public MqEntity | |
{ | |
public: | |
MqClient(const std::string& mqName); | |
~MqClient(); | |
ssize_t recv(char* msgPtr, size_t msgLen, uint32_t* outMsgPrio = NULL); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment