Skip to content

Instantly share code, notes, and snippets.

@Gelio
Created February 18, 2017 22:43
Show Gist options
  • Save Gelio/711c0bc5a2a316565b6f9feffab80a50 to your computer and use it in GitHub Desktop.
Save Gelio/711c0bc5a2a316565b6f9feffab80a50 to your computer and use it in GitHub Desktop.
SOP2 L1 - inter-process communication using POSIX pipes, ring architecture with the main process creating subprocesses
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#define DELTA_MIN -10
#define DELTA_MAX 15
#define SUBPROCESS_COUNT 3
#define TOTAL_PROCESS_COUNT (SUBPROCESS_COUNT + 1)
#define INITIAL_VALUE 3
#define MAX_VALUE 999
#define ERR(source) ( fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \
perror(source), kill(0, SIGKILL), \
exit(EXIT_FAILURE) )
void preparePipes(int **pipes) {
for (int i=0; i < TOTAL_PROCESS_COUNT; i++) {
pipes[i] = calloc(2, sizeof(int));
if (pipes[i] == NULL)
ERR("calloc");
if (pipe(pipes[i]) < 0)
ERR("pipe");
}
}
void freePipes(int **pipes) {
for (int i=0; i < TOTAL_PROCESS_COUNT; i++)
free(pipes[i]);
}
void closePipe(int fd) {
if (close(fd) < 0 && errno != EBADF)
ERR("close");
}
void closePipesExceptTwo(int **pipes, int fd1, int fd2) {
for (int i=0; i < TOTAL_PROCESS_COUNT; i++) {
if (pipes[i][0] != fd1 && pipes[i][0] != fd2)
closePipe(pipes[i][0]);
if (pipes[i][1] != fd1 && pipes[i][1] != fd2)
closePipe(pipes[i][1]);
}
}
void listenAndForward(int readFd, int writeFd, int *pipesToClose) {
pid_t pid = getpid();
srand(pid);
int number, messagesReceived = 0;
ssize_t count;
do {
count = read(readFd, (char*)&number, sizeof(int));
if (count < 0)
ERR("read");
if (count > 0) {
messagesReceived++;
if (number > MAX_VALUE) {
printf("[%d] received %d, greater than %d, closing pipes\n", pid, number, MAX_VALUE);
closePipe(pipesToClose[0]);
closePipe(pipesToClose[1]);
printf("[%d] received %d messages in total\n", getpid(), messagesReceived);
return;
}
int forwardedNumber = number + (DELTA_MIN + rand() % (DELTA_MAX - DELTA_MIN + 1));
printf("[%d] received %d, forwarding %d\n", pid, number, forwardedNumber);
ssize_t writeCount = write(writeFd, (char*)&forwardedNumber, sizeof(int));
if (writeCount < 0)
ERR("write");
}
} while (count > 0);
printf("[%d] received %d messages in total\n", getpid(), messagesReceived);
}
void childWorker(int **pipes, int childId) {
int inputFd = pipes[childId][0]; // read fd
int outputFd = pipes[(childId+1) % TOTAL_PROCESS_COUNT][1]; // write fd
closePipesExceptTwo(pipes, inputFd, outputFd);
listenAndForward(inputFd, outputFd, pipes[childId]);
closePipesExceptTwo(pipes, -1, -1);
freePipes(pipes);
printf("[%d] exiting\n", getpid());
}
int parentWorker(int **pipes) {
int outputFd = pipes[1 % TOTAL_PROCESS_COUNT][1];
int initialValue = INITIAL_VALUE;
printf("[PARENT] children created, starting communication with %d\n", initialValue);
if (write(outputFd, (char*)&initialValue, sizeof(int)) < 0)
ERR("write");
childWorker(pipes, 0);
printf("[PARENT] waiting for children\n");
closePipesExceptTwo(pipes, -1, -1);
while(wait(NULL) >= 0);
printf("[PARENT] exiting\n");
return EXIT_SUCCESS;
}
int main() {
int* pipes[TOTAL_PROCESS_COUNT];
preparePipes(pipes);
printf("[PARENT] pid = %d\n", getpid());
for (int i=1; i <= SUBPROCESS_COUNT; i++) {
pid_t pid = fork();
if (pid < 0)
ERR("fork");
else if (pid == 0) {
printf("[CHILD] pid = %d\n", getpid());
childWorker(pipes, i);
return EXIT_SUCCESS;
}
}
return parentWorker(pipes);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment