Skip to content

Instantly share code, notes, and snippets.

@Gelio
Created March 25, 2017 17:43
Show Gist options
  • Save Gelio/d0162c23d9cc4219dd7fc34d9b383f27 to your computer and use it in GitHub Desktop.
Save Gelio/d0162c23d9cc4219dd7fc34d9b383f27 to your computer and use it in GitHub Desktop.
Peer-to-peer communicator written in C for the Operating Systems class. Utilizes POSIX message queues
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#define MAX_QUEUE_NAME_LENGTH 100
#define MAX_MESSAGES 10
#define MESSAGE_LENGTH 100
#define MAX_PEERS 5
#define MAX_TTL 5
#define PRIORITY_QUIT 3
#define PRIORITY_MESSAGE 2
#define PRIORITY_REGISTER 1
#define ERR(source) ( fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \
perror(source), exit(EXIT_FAILURE) )
struct peer_struct {
pid_t pid;
mqd_t queueDes;
};
typedef struct peer_struct peer_t;
void usage(char *fileName)
{
fprintf(stderr, "Usage: %s [pid]\n", fileName);
exit(EXIT_FAILURE);
}
void getQueueName(char buffer[MAX_QUEUE_NAME_LENGTH], pid_t pid)
{
if (snprintf(buffer, MAX_QUEUE_NAME_LENGTH, "/%d_queue", pid) < 0)
ERR("snprintf");
}
void addPeerToList(peer_t peers[MAX_PEERS], int *peerCount, pid_t pid)
{
if (*peerCount >= MAX_PEERS)
return;
char queueName[MAX_QUEUE_NAME_LENGTH];
getQueueName(queueName, pid);
peers[*peerCount].pid = pid;
peers[*peerCount].queueDes = mq_open(queueName, O_WRONLY);
if (peers[*peerCount].queueDes < 0)
ERR("mq_open");
*peerCount = *peerCount + 1;
}
void disconnectFromPeers(peer_t *peers, int *peerCount)
{
char message[MESSAGE_LENGTH];
*(pid_t*)message = getpid();
for (int i=0; i < *peerCount; i++)
{
if (mq_send(peers[i].queueDes, message, MESSAGE_LENGTH, PRIORITY_QUIT) < 0)
ERR("mq_send");
if (mq_close(peers[i].queueDes) < 0)
ERR("mq_close");
}
*peerCount = 0;
}
void sendGreetMessage(peer_t peer)
{
char message[MESSAGE_LENGTH];
*(pid_t*)message = getpid();
if (mq_send(peer.queueDes, message, MESSAGE_LENGTH, PRIORITY_REGISTER) < 0)
ERR("mq_send");
}
void printPeerList(peer_t *peers, int peerCount)
{
printf("%d peers:\n", peerCount);
for (int i=0; i < peerCount; i++)
printf(" pid = %d\n", peers[i].pid);
}
void setHandler(int signal, void (*handler)(int))
{
struct sigaction action;
memset(&action, 0, sizeof(struct sigaction));
action.sa_handler = handler;
if (sigaction(signal, &action, NULL) < 0)
ERR("sigaction");
}
volatile sig_atomic_t lastSignal = 0;
void handleSignal(int signal)
{
lastSignal = signal;
}
peer_t peers[MAX_PEERS];
int peerCount = 0;
char ownQueueName[MAX_QUEUE_NAME_LENGTH];
mqd_t ownQueue;
void disconnectAndQuit()
{
printf("Disconnecting from peers\n");
disconnectFromPeers(peers, &peerCount);
if (mq_close(ownQueue) < 0)
ERR("mq_close");
if (mq_unlink(ownQueueName) < 0)
ERR("mq_unlink");
exit(EXIT_SUCCESS);
}
void sendMessage(char message[MESSAGE_LENGTH], pid_t receiverPid)
{
for (int i=0; i < peerCount; i++)
{
if (peers[i].pid == receiverPid)
{
if (mq_send(peers[i].queueDes, message, MESSAGE_LENGTH, PRIORITY_MESSAGE) < 0)
ERR("mq_send");
return;
}
}
printf("peer not found");
int *messageTTL = (int*)(message + 2 * sizeof(pid_t));
if (*messageTTL <= 0)
{
printf(", but ttl is 0 and cannot resend\n");
return;
}
printf(", decrementing ttl and resending to all peers\n");
*messageTTL = *messageTTL - 1;
for (int i=0; i < peerCount; i++)
{
if (mq_send(peers[i].queueDes, message, MESSAGE_LENGTH, PRIORITY_MESSAGE) < 0)
ERR("mq_send");
}
}
void registerNotifications(mqd_t queueDes);
void handleMessages()
{
registerNotifications(ownQueue);
while (1)
{
char message[MESSAGE_LENGTH];
unsigned int messagePriority;
if (mq_receive(ownQueue, message, MESSAGE_LENGTH, &messagePriority) < 0)
{
if (errno == EINTR)
{
// check signals
if (lastSignal == SIGINT)
disconnectAndQuit();
}
else if (errno == EAGAIN)
break;
else
ERR("mq_receive");
}
pid_t senderPid = *(pid_t*)message;
printf("Received message from %d with priority %d\n", senderPid, messagePriority);
switch (messagePriority)
{
case PRIORITY_REGISTER:
// rejestracja
printf("Received greeting from %d\n", senderPid);
addPeerToList(peers, &peerCount, senderPid);
printPeerList(peers, peerCount);
break;
case PRIORITY_MESSAGE:
{
// wiadomość
pid_t receiverPid = *(pid_t*)(message + sizeof(pid_t));
if (receiverPid == getpid())
{
char *messageContents = message + 2 * sizeof(pid_t);
printf("Message from %d: %s\n", senderPid, messageContents);
}
else
{
printf("Got a message from %d not to me, resending\n", senderPid);
sendMessage(message, receiverPid);
}
break;
}
case PRIORITY_QUIT:
// zakończenie działania
disconnectAndQuit();
break;
default:
{
// nieznana wiadomość
char *messageContents = message + 2 * sizeof(pid_t);
printf("Unknown message received from %d: %s\n", senderPid, messageContents);
break;
}
}
}
}
void registerNotifications(mqd_t queueDes)
{
struct sigevent notification;
memset(&notification, 0, sizeof(struct sigevent));
notification.sigev_notify = SIGEV_THREAD;
notification.sigev_notify_function = handleMessages;
if (mq_notify(queueDes, &notification) < 0)
ERR("mq_notify");
}
int main(int argc, char **argv)
{
if (argc > 2)
usage(argv[0]);
setHandler(SIGINT, handleSignal);
if (argc == 2)
{
pid_t peerPid = atoi(argv[1]);
if (peerPid <= 0)
usage(argv[0]);
addPeerToList(peers, &peerCount, peerPid);
sendGreetMessage(peers[0]);
}
getQueueName(ownQueueName, getpid());
struct mq_attr queueAttributes;
queueAttributes.mq_maxmsg = MAX_MESSAGES;
queueAttributes.mq_msgsize = MESSAGE_LENGTH;
ownQueue = mq_open(ownQueueName, O_RDONLY | O_CREAT | O_NONBLOCK, 0600, &queueAttributes);
if (ownQueue < 0)
ERR("mq_open");
registerNotifications(ownQueue);
printf("Peer number = %d\n", getpid());
if (peerCount == 0)
printf("waiting for connections\n");
else
printPeerList(peers, peerCount);
while (1)
{
pid_t receiverPid;
char messageFromInput[MESSAGE_LENGTH];
if (fscanf(stdin, "%d %s", &receiverPid, messageFromInput) < 0)
{
if (errno == EINTR)
{
if (lastSignal == SIGINT)
disconnectAndQuit();
}
ERR("fscanf");
}
char messageToSend[MESSAGE_LENGTH];
*(pid_t*)messageToSend = getpid();
*(pid_t*)(messageToSend + sizeof(pid_t)) = receiverPid;
*(int*)(messageToSend + 2 * sizeof(pid_t)) = MAX_TTL;
strncpy(messageToSend + 2 * sizeof(pid_t) + sizeof(int), messageFromInput, MESSAGE_LENGTH - 2 * sizeof(pid_t) - sizeof(int));
sendMessage(messageToSend, receiverPid);
printf("Message sent\n");
}
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment