Skip to content

Instantly share code, notes, and snippets.

@icedraco
Created November 16, 2021 23:33
Show Gist options
  • Save icedraco/965e2f94ae28cc8cadfe0fef586ebacd to your computer and use it in GitHub Desktop.
Save icedraco/965e2f94ae28cc8cadfe0fef586ebacd to your computer and use it in GitHub Desktop.
POSIX Message Queues Example
#include <string>
#include <sstream>
#include <unistd.h>
#include "mq.h"
#define MQ_NAME "/ice.test"
void printSyntax(const char* exe)
{
std::cerr << "Syntax: " << exe << " {server|client}" << std::endl;
}
int doServer()
{
MqServer server(MQ_NAME);
if (!server.isOpen())
{
std::cerr << "SERVER COULD NOT OPEN" << std::endl;
return 2;
}
for (int i = 0; i < 15; ++i)
{
std::stringstream ss;
ss << "Message #" << i;
server.send(ss.str());
sleep(3);
}
std::cout << "OK" << std::endl;
return 0;
}
int doClient()
{
const uint16_t bufferLen = MqServer::kDefaultMessageSize;
char buffer[bufferLen];
MqClient client(MQ_NAME);
if (!client.isOpen())
{
std::cerr << "CLIENT FAILED TO OPEN!" << std::endl;
return 2;
}
while (true)
{
uint32_t msgPrio = -1;
ssize_t size = client.recv(buffer, bufferLen, &msgPrio);
}
return 0;
}
int main(int argc, char** argv)
{
char mode = 'x';
if (argc >= 2)
{
mode = argv[1][0];
}
switch (mode)
{
case 's':
return doServer();
case 'c':
return doClient();
default:
printSyntax(argv[0]);
return 1;
}
}
#include "mq.h"
#include <cstdio>
//=============================================================================
// SERVER
//=============================================================================
MqServer::MqServer(
const std::string& mqName,
int permissions,
uint16_t maxMessages,
uint16_t maxMessageSize
) : MqEntity(mqName)
{
struct mq_attr attr;
attr.mq_flags = 0; // ignored for mq_open()
attr.mq_maxmsg = maxMessages;
attr.mq_msgsize = maxMessageSize;
attr.mq_curmsgs = 0; // ignore for mq_open()
myQueueDescriptor = mq_open(mqName.c_str(), O_WRONLY|O_CREAT, permissions, &attr);
if (myQueueDescriptor == -1)
{
perror("mq_open()");
}
else
{
std::cout << "[OPEN] " << getName()
<< " (maxMsg=" << maxMessages << ", maxMsgSize=" << maxMessageSize << ")"
<< std::endl;
myIsOpen = true;
}
}
MqServer::~MqServer()
{
if (isOpen())
{
int ret = mq_close(myQueueDescriptor);
if (ret == -1)
{
perror("mq_close()");
}
ret = mq_unlink(getName().c_str());
if (ret == -1)
{
perror("mq_unlink()");
}
std::cout << "[DOWN] " << getName() << std::endl;
}
}
int MqServer::send(const std::string& msg, uint32_t msgPriority)
{
int ret = mq_send(myQueueDescriptor, msg.c_str(), msg.size(), msgPriority);
if (ret == -1)
{
perror("mq_send()");
}
else
{
std::cout << "[" << getName() << "] <-- " << msg << std::endl;
}
return ret;
}
//=============================================================================
// CLIENT
//=============================================================================
MqClient::MqClient(const std::string& mqName) : MqEntity(mqName)
{
myQueueDescriptor = mq_open(mqName.c_str(), O_RDONLY);
if (myQueueDescriptor == -1)
{
perror("mq_open()");
}
else
{
std::cout << "[OPEN] " << getName() << " (client)" << std::endl;
myIsOpen = true;
}
}
MqClient::~MqClient()
{
if (isOpen())
{
int ret = mq_close(myQueueDescriptor);
if (ret == -1)
{
perror("mq_close()");
}
std::cout << "[DOWN] " << getName() << std::endl;
}
}
ssize_t MqClient::recv(char* msgPtr, size_t msgLen, uint32_t* outMsgPrio)
{
ssize_t bytesRead = mq_receive(myQueueDescriptor, msgPtr, msgLen, outMsgPrio);
if (bytesRead == -1)
{
perror("mq_receive()");
}
else
{
msgPtr[bytesRead] = '\0';
std::cout << "[" << getName() << "] --> " << msgPtr << std::endl;
}
return bytesRead;
}
#pragma once
#include <iostream>
#include <string>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
class MqEntity
{
public:
MqEntity(const std::string& mqName) : myIsOpen(false), myQueueName(mqName) {}
~MqEntity() {}
inline const std::string& getName() const { return myQueueName; }
inline bool isOpen() const { return myIsOpen; }
protected:
mqd_t myQueueDescriptor;
bool myIsOpen;
private:
std::string myQueueName;
};
class MqServer: public MqEntity
{
public:
static const int kDefaultPermissions = 0600;
static const uint16_t kDefaultMaxMessages = 5;
static const uint16_t kDefaultMessageSize = 256;
static const uint32_t kDefaultMessagePriority = 3;
MqServer(
const std::string& mqName,
int permissions = kDefaultPermissions,
uint16_t maxMessages = kDefaultMaxMessages,
uint16_t maxMessageSize = kDefaultMessageSize);
~MqServer();
int send(const std::string& msg, uint32_t msgPriority=kDefaultMessagePriority);
};
class MqClient: public MqEntity
{
public:
MqClient(const std::string& mqName);
~MqClient();
ssize_t recv(char* msgPtr, size_t msgLen, uint32_t* outMsgPrio = NULL);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment