Created
February 18, 2017 22:43
-
-
Save Gelio/711c0bc5a2a316565b6f9feffab80a50 to your computer and use it in GitHub Desktop.
SOP2 L1 - inter-process communication using POSIX pipes, ring architecture with the main process creating subprocesses
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <errno.h> | |
#include <unistd.h> | |
#include <signal.h> | |
#include <sys/wait.h> | |
#define DELTA_MIN -10 | |
#define DELTA_MAX 15 | |
#define SUBPROCESS_COUNT 3 | |
#define TOTAL_PROCESS_COUNT (SUBPROCESS_COUNT + 1) | |
#define INITIAL_VALUE 3 | |
#define MAX_VALUE 999 | |
#define ERR(source) ( fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), \ | |
perror(source), kill(0, SIGKILL), \ | |
exit(EXIT_FAILURE) ) | |
void preparePipes(int **pipes) { | |
for (int i=0; i < TOTAL_PROCESS_COUNT; i++) { | |
pipes[i] = calloc(2, sizeof(int)); | |
if (pipes[i] == NULL) | |
ERR("calloc"); | |
if (pipe(pipes[i]) < 0) | |
ERR("pipe"); | |
} | |
} | |
void freePipes(int **pipes) { | |
for (int i=0; i < TOTAL_PROCESS_COUNT; i++) | |
free(pipes[i]); | |
} | |
void closePipe(int fd) { | |
if (close(fd) < 0 && errno != EBADF) | |
ERR("close"); | |
} | |
void closePipesExceptTwo(int **pipes, int fd1, int fd2) { | |
for (int i=0; i < TOTAL_PROCESS_COUNT; i++) { | |
if (pipes[i][0] != fd1 && pipes[i][0] != fd2) | |
closePipe(pipes[i][0]); | |
if (pipes[i][1] != fd1 && pipes[i][1] != fd2) | |
closePipe(pipes[i][1]); | |
} | |
} | |
void listenAndForward(int readFd, int writeFd, int *pipesToClose) { | |
pid_t pid = getpid(); | |
srand(pid); | |
int number, messagesReceived = 0; | |
ssize_t count; | |
do { | |
count = read(readFd, (char*)&number, sizeof(int)); | |
if (count < 0) | |
ERR("read"); | |
if (count > 0) { | |
messagesReceived++; | |
if (number > MAX_VALUE) { | |
printf("[%d] received %d, greater than %d, closing pipes\n", pid, number, MAX_VALUE); | |
closePipe(pipesToClose[0]); | |
closePipe(pipesToClose[1]); | |
printf("[%d] received %d messages in total\n", getpid(), messagesReceived); | |
return; | |
} | |
int forwardedNumber = number + (DELTA_MIN + rand() % (DELTA_MAX - DELTA_MIN + 1)); | |
printf("[%d] received %d, forwarding %d\n", pid, number, forwardedNumber); | |
ssize_t writeCount = write(writeFd, (char*)&forwardedNumber, sizeof(int)); | |
if (writeCount < 0) | |
ERR("write"); | |
} | |
} while (count > 0); | |
printf("[%d] received %d messages in total\n", getpid(), messagesReceived); | |
} | |
void childWorker(int **pipes, int childId) { | |
int inputFd = pipes[childId][0]; // read fd | |
int outputFd = pipes[(childId+1) % TOTAL_PROCESS_COUNT][1]; // write fd | |
closePipesExceptTwo(pipes, inputFd, outputFd); | |
listenAndForward(inputFd, outputFd, pipes[childId]); | |
closePipesExceptTwo(pipes, -1, -1); | |
freePipes(pipes); | |
printf("[%d] exiting\n", getpid()); | |
} | |
int parentWorker(int **pipes) { | |
int outputFd = pipes[1 % TOTAL_PROCESS_COUNT][1]; | |
int initialValue = INITIAL_VALUE; | |
printf("[PARENT] children created, starting communication with %d\n", initialValue); | |
if (write(outputFd, (char*)&initialValue, sizeof(int)) < 0) | |
ERR("write"); | |
childWorker(pipes, 0); | |
printf("[PARENT] waiting for children\n"); | |
closePipesExceptTwo(pipes, -1, -1); | |
while(wait(NULL) >= 0); | |
printf("[PARENT] exiting\n"); | |
return EXIT_SUCCESS; | |
} | |
int main() { | |
int* pipes[TOTAL_PROCESS_COUNT]; | |
preparePipes(pipes); | |
printf("[PARENT] pid = %d\n", getpid()); | |
for (int i=1; i <= SUBPROCESS_COUNT; i++) { | |
pid_t pid = fork(); | |
if (pid < 0) | |
ERR("fork"); | |
else if (pid == 0) { | |
printf("[CHILD] pid = %d\n", getpid()); | |
childWorker(pipes, i); | |
return EXIT_SUCCESS; | |
} | |
} | |
return parentWorker(pipes); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment