Created
March 25, 2017 17:43
-
-
Save Gelio/d0162c23d9cc4219dd7fc34d9b383f27 to your computer and use it in GitHub Desktop.
Peer-to-peer communicator written in C for the Operating Systems class. Utilizes POSIX message queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <mqueue.h> | |
#include <unistd.h> | |
#include <signal.h> | |
#include <string.h> | |
#include <errno.h> | |
#define MAX_QUEUE_NAME_LENGTH 100 | |
#define MAX_MESSAGES 10 | |
#define MESSAGE_LENGTH 100 | |
#define MAX_PEERS 5 | |
#define MAX_TTL 5 | |
#define PRIORITY_QUIT 3 | |
#define PRIORITY_MESSAGE 2 | |
#define PRIORITY_REGISTER 1 | |
#define ERR(source) ( fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \ | |
perror(source), exit(EXIT_FAILURE) ) | |
struct peer_struct { | |
pid_t pid; | |
mqd_t queueDes; | |
}; | |
typedef struct peer_struct peer_t; | |
void usage(char *fileName) | |
{ | |
fprintf(stderr, "Usage: %s [pid]\n", fileName); | |
exit(EXIT_FAILURE); | |
} | |
void getQueueName(char buffer[MAX_QUEUE_NAME_LENGTH], pid_t pid) | |
{ | |
if (snprintf(buffer, MAX_QUEUE_NAME_LENGTH, "/%d_queue", pid) < 0) | |
ERR("snprintf"); | |
} | |
void addPeerToList(peer_t peers[MAX_PEERS], int *peerCount, pid_t pid) | |
{ | |
if (*peerCount >= MAX_PEERS) | |
return; | |
char queueName[MAX_QUEUE_NAME_LENGTH]; | |
getQueueName(queueName, pid); | |
peers[*peerCount].pid = pid; | |
peers[*peerCount].queueDes = mq_open(queueName, O_WRONLY); | |
if (peers[*peerCount].queueDes < 0) | |
ERR("mq_open"); | |
*peerCount = *peerCount + 1; | |
} | |
void disconnectFromPeers(peer_t *peers, int *peerCount) | |
{ | |
char message[MESSAGE_LENGTH]; | |
*(pid_t*)message = getpid(); | |
for (int i=0; i < *peerCount; i++) | |
{ | |
if (mq_send(peers[i].queueDes, message, MESSAGE_LENGTH, PRIORITY_QUIT) < 0) | |
ERR("mq_send"); | |
if (mq_close(peers[i].queueDes) < 0) | |
ERR("mq_close"); | |
} | |
*peerCount = 0; | |
} | |
void sendGreetMessage(peer_t peer) | |
{ | |
char message[MESSAGE_LENGTH]; | |
*(pid_t*)message = getpid(); | |
if (mq_send(peer.queueDes, message, MESSAGE_LENGTH, PRIORITY_REGISTER) < 0) | |
ERR("mq_send"); | |
} | |
void printPeerList(peer_t *peers, int peerCount) | |
{ | |
printf("%d peers:\n", peerCount); | |
for (int i=0; i < peerCount; i++) | |
printf(" pid = %d\n", peers[i].pid); | |
} | |
void setHandler(int signal, void (*handler)(int)) | |
{ | |
struct sigaction action; | |
memset(&action, 0, sizeof(struct sigaction)); | |
action.sa_handler = handler; | |
if (sigaction(signal, &action, NULL) < 0) | |
ERR("sigaction"); | |
} | |
volatile sig_atomic_t lastSignal = 0; | |
void handleSignal(int signal) | |
{ | |
lastSignal = signal; | |
} | |
peer_t peers[MAX_PEERS]; | |
int peerCount = 0; | |
char ownQueueName[MAX_QUEUE_NAME_LENGTH]; | |
mqd_t ownQueue; | |
void disconnectAndQuit() | |
{ | |
printf("Disconnecting from peers\n"); | |
disconnectFromPeers(peers, &peerCount); | |
if (mq_close(ownQueue) < 0) | |
ERR("mq_close"); | |
if (mq_unlink(ownQueueName) < 0) | |
ERR("mq_unlink"); | |
exit(EXIT_SUCCESS); | |
} | |
void sendMessage(char message[MESSAGE_LENGTH], pid_t receiverPid) | |
{ | |
for (int i=0; i < peerCount; i++) | |
{ | |
if (peers[i].pid == receiverPid) | |
{ | |
if (mq_send(peers[i].queueDes, message, MESSAGE_LENGTH, PRIORITY_MESSAGE) < 0) | |
ERR("mq_send"); | |
return; | |
} | |
} | |
printf("peer not found"); | |
int *messageTTL = (int*)(message + 2 * sizeof(pid_t)); | |
if (*messageTTL <= 0) | |
{ | |
printf(", but ttl is 0 and cannot resend\n"); | |
return; | |
} | |
printf(", decrementing ttl and resending to all peers\n"); | |
*messageTTL = *messageTTL - 1; | |
for (int i=0; i < peerCount; i++) | |
{ | |
if (mq_send(peers[i].queueDes, message, MESSAGE_LENGTH, PRIORITY_MESSAGE) < 0) | |
ERR("mq_send"); | |
} | |
} | |
void registerNotifications(mqd_t queueDes); | |
void handleMessages() | |
{ | |
registerNotifications(ownQueue); | |
while (1) | |
{ | |
char message[MESSAGE_LENGTH]; | |
unsigned int messagePriority; | |
if (mq_receive(ownQueue, message, MESSAGE_LENGTH, &messagePriority) < 0) | |
{ | |
if (errno == EINTR) | |
{ | |
// check signals | |
if (lastSignal == SIGINT) | |
disconnectAndQuit(); | |
} | |
else if (errno == EAGAIN) | |
break; | |
else | |
ERR("mq_receive"); | |
} | |
pid_t senderPid = *(pid_t*)message; | |
printf("Received message from %d with priority %d\n", senderPid, messagePriority); | |
switch (messagePriority) | |
{ | |
case PRIORITY_REGISTER: | |
// rejestracja | |
printf("Received greeting from %d\n", senderPid); | |
addPeerToList(peers, &peerCount, senderPid); | |
printPeerList(peers, peerCount); | |
break; | |
case PRIORITY_MESSAGE: | |
{ | |
// wiadomość | |
pid_t receiverPid = *(pid_t*)(message + sizeof(pid_t)); | |
if (receiverPid == getpid()) | |
{ | |
char *messageContents = message + 2 * sizeof(pid_t); | |
printf("Message from %d: %s\n", senderPid, messageContents); | |
} | |
else | |
{ | |
printf("Got a message from %d not to me, resending\n", senderPid); | |
sendMessage(message, receiverPid); | |
} | |
break; | |
} | |
case PRIORITY_QUIT: | |
// zakończenie działania | |
disconnectAndQuit(); | |
break; | |
default: | |
{ | |
// nieznana wiadomość | |
char *messageContents = message + 2 * sizeof(pid_t); | |
printf("Unknown message received from %d: %s\n", senderPid, messageContents); | |
break; | |
} | |
} | |
} | |
} | |
void registerNotifications(mqd_t queueDes) | |
{ | |
struct sigevent notification; | |
memset(¬ification, 0, sizeof(struct sigevent)); | |
notification.sigev_notify = SIGEV_THREAD; | |
notification.sigev_notify_function = handleMessages; | |
if (mq_notify(queueDes, ¬ification) < 0) | |
ERR("mq_notify"); | |
} | |
int main(int argc, char **argv) | |
{ | |
if (argc > 2) | |
usage(argv[0]); | |
setHandler(SIGINT, handleSignal); | |
if (argc == 2) | |
{ | |
pid_t peerPid = atoi(argv[1]); | |
if (peerPid <= 0) | |
usage(argv[0]); | |
addPeerToList(peers, &peerCount, peerPid); | |
sendGreetMessage(peers[0]); | |
} | |
getQueueName(ownQueueName, getpid()); | |
struct mq_attr queueAttributes; | |
queueAttributes.mq_maxmsg = MAX_MESSAGES; | |
queueAttributes.mq_msgsize = MESSAGE_LENGTH; | |
ownQueue = mq_open(ownQueueName, O_RDONLY | O_CREAT | O_NONBLOCK, 0600, &queueAttributes); | |
if (ownQueue < 0) | |
ERR("mq_open"); | |
registerNotifications(ownQueue); | |
printf("Peer number = %d\n", getpid()); | |
if (peerCount == 0) | |
printf("waiting for connections\n"); | |
else | |
printPeerList(peers, peerCount); | |
while (1) | |
{ | |
pid_t receiverPid; | |
char messageFromInput[MESSAGE_LENGTH]; | |
if (fscanf(stdin, "%d %s", &receiverPid, messageFromInput) < 0) | |
{ | |
if (errno == EINTR) | |
{ | |
if (lastSignal == SIGINT) | |
disconnectAndQuit(); | |
} | |
ERR("fscanf"); | |
} | |
char messageToSend[MESSAGE_LENGTH]; | |
*(pid_t*)messageToSend = getpid(); | |
*(pid_t*)(messageToSend + sizeof(pid_t)) = receiverPid; | |
*(int*)(messageToSend + 2 * sizeof(pid_t)) = MAX_TTL; | |
strncpy(messageToSend + 2 * sizeof(pid_t) + sizeof(int), messageFromInput, MESSAGE_LENGTH - 2 * sizeof(pid_t) - sizeof(int)); | |
sendMessage(messageToSend, receiverPid); | |
printf("Message sent\n"); | |
} | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment