Created
February 23, 2012 20:15
-
-
Save bboe/1894848 to your computer and use it in GitHub Desktop.
Process-based Job Management in C
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <errno.h> | |
#include <signal.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/select.h> | |
#include <sys/wait.h> | |
#include "job_manager.h" | |
volatile sig_atomic_t multiprocess_running = 0; | |
void multiprocess_stop(int sig) { | |
multiprocess_running = 0; | |
signal(sig, multiprocess_stop); | |
} | |
void manager_init(struct manager *manager, int num_workers, | |
void (*input_callback)(char *), | |
void(*output_callback)(char *), | |
void (*work_function)(char *)) { | |
manager->num_workers = num_workers; | |
manager->workers = NULL; | |
manager->input_callback = input_callback; | |
manager->output_callback = output_callback; | |
manager->work_function = work_function; | |
} | |
int manager_run_jobs(struct manager *manager, int num_jobs) { | |
char buf[MAX_MESSAGE]; | |
fd_set read_set; | |
int i, completed=0, sent=0; | |
void *prev_sigint; | |
struct worker *cur; | |
/* Lessen the number of workers if they aren't needed */ | |
if (num_jobs < manager->num_workers) { | |
manager->num_workers = num_jobs; | |
manager->max_fd = 4 + 2 * manager->num_workers; | |
} | |
/* Sanity check on the number of workers */ | |
if (manager->num_workers < 1) | |
return -1; | |
/* Perform additional setup */ | |
manager->max_fd = 4 + 2 * manager->num_workers; | |
manager->workers = malloc(sizeof(struct worker*) * manager->num_workers); | |
memset(manager->workers, 0, sizeof(struct worker*) * manager->num_workers); | |
manager->fd_to_file = malloc(sizeof(FILE *) * manager->max_fd); | |
manager->fd_to_worker = malloc(sizeof(int) * manager->max_fd); | |
/* Start the workers */ | |
for(i = 0; i < manager->num_workers; ++i) { | |
cur = worker_create(manager); | |
manager->fd_to_file[cur->input[1]] = fdopen(cur->input[1], "w"); | |
manager->fd_to_file[cur->output[0]] = fdopen(cur->output[0], "r"); | |
manager->fd_to_worker[cur->output[0]] = i; | |
manager->workers[i] = cur; | |
} | |
multiprocess_running = 1; | |
prev_sigint = signal(SIGINT, multiprocess_stop); | |
while (multiprocess_running && completed != num_jobs || | |
!multiprocess_running && completed != sent) { | |
/* Build file descriptor set and send jobs to non-busy workers */ | |
FD_ZERO(&read_set); | |
for(i = 0; i < manager->num_workers; ++i) { | |
if (multiprocess_running && !manager->workers[i]->busy && | |
sent < num_jobs) { | |
manager->input_callback(buf); | |
fprintf(manager->fd_to_file[manager->workers[i]->input[1]], "%s", buf); | |
fflush(manager->fd_to_file[manager->workers[i]->input[1]]); | |
manager->workers[i]->busy = 1; | |
++sent; | |
} | |
if (manager->workers[i]->busy) | |
FD_SET(manager->workers[i]->output[0], &read_set); | |
} | |
/* Wait until there are results */ | |
if ((i = select(manager->max_fd + 1, &read_set, NULL, NULL, NULL)) < 0) { | |
if (errno == EINTR) | |
continue; | |
perror("Select error"); exit(1); | |
} | |
/* Fetch results */ | |
for(i = 0; i < manager->max_fd; ++i) { | |
if (FD_ISSET(i, &read_set)) { | |
fgets(buf, MAX_MESSAGE, manager->fd_to_file[i]); | |
manager->output_callback(buf); | |
manager->workers[manager->fd_to_worker[i]]->busy = 0; | |
++completed; | |
} | |
} | |
} | |
/* Wait on all the workers */ | |
for(i = 0; i < manager->num_workers; ++i) { | |
fclose(manager->fd_to_file[manager->workers[i]->input[1]]); | |
fclose(manager->fd_to_file[manager->workers[i]->output[0]]); | |
waitpid(manager->workers[i]->pid, NULL, 0); | |
free(manager->workers[i]); | |
manager->workers[i] = NULL; | |
} | |
signal(SIGINT, prev_sigint); | |
return completed; | |
} | |
void manager_destruct(struct manager *manager) { | |
int i; | |
if (manager->workers == NULL) /* manager_run_jobs was never called */ | |
return; | |
for (i = 0; i < manager->num_workers; ++i) | |
if (manager->workers[i] != NULL) { | |
fclose(manager->fd_to_file[manager->workers[i]->input[1]]); | |
fclose(manager->fd_to_file[manager->workers[i]->output[0]]); | |
free(manager->workers[i]); | |
} | |
free(manager->workers); | |
free(manager->fd_to_file); | |
free(manager->fd_to_worker); | |
} | |
struct worker *worker_create(struct manager *manager) { | |
struct worker *worker = malloc(sizeof(struct worker)); | |
void (*func)(char *) = manager->work_function; | |
worker->busy = 0; | |
if (pipe(worker->input)) { | |
perror("Worker input pipe creation error"); exit(1); | |
} | |
if (pipe(worker->output)) { | |
perror("Worker output pipe creation error"); exit(1); | |
} | |
if ((worker->pid = fork()) == 0) { | |
signal(SIGINT, SIG_IGN); | |
manager_destruct(manager); | |
if (dup2(worker->input[0], STDIN_FILENO) != STDIN_FILENO) { | |
perror("Error moving input descriptor"); exit(1); | |
} | |
if (dup2(worker->output[1], 3) != 3) { | |
perror("Error moving output descriptor"); exit(1); | |
} | |
if (worker->input[0] != 3) | |
close(worker->input[0]); | |
close(worker->input[1]); | |
close(worker->output[0]); | |
close(worker->output[1]); | |
free(worker); | |
worker_run_loop(func, STDIN_FILENO, 3); | |
exit(0); | |
} | |
close(worker->input[0]); | |
close(worker->output[1]); | |
return worker; | |
} | |
void worker_run_loop(void (*func)(char *), int in_fd, int out_fd) { | |
char buf[MAX_MESSAGE]; | |
FILE *input = fdopen(in_fd, "r"); | |
FILE *output = fdopen(out_fd, "w"); | |
while (fgets(buf, MAX_MESSAGE, input) != NULL) { | |
func(buf); | |
fprintf(output, "%s", buf); | |
fflush(output); | |
} | |
fclose(input); | |
fclose(output); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef JOB_MANAGER | |
#define JOB_MANAGER | |
#include <stdio.h> | |
#include <unistd.h> | |
#define MAX_MESSAGE 1024 | |
struct worker { | |
int input[2]; | |
int output[2]; | |
int busy; | |
pid_t pid; | |
}; | |
struct manager { | |
FILE **fd_to_file; | |
int *fd_to_worker; | |
int max_fd; | |
int num_workers; | |
struct worker **workers; | |
void (*input_callback)(char *); | |
void (*output_callback)(char *); | |
void (*work_function)(char *); | |
}; | |
/* Initialize the manager structure. | |
Parameters: | |
manager: The address of an already allocated manager structure | |
num_workers: The number of workers that this job_manager will spawn. | |
input_callback: This function is called just before sending a job to a | |
worker. Its single parameter is a buffer whose contents will be | |
communicated to the worker process [see I/O buffer]. | |
output_callback: This function is called after a worker has completed a | |
job. Its single parameter is a buffer whose contents directly represent | |
the results from the work_function output [see I/O buffer.] | |
work_function: This function is what is called by each worker when a job | |
is ready for processing. Its single parameter is a buffer that is used | |
both for input and output. On invocation, the buffer contains the output | |
from one of the input_callback functions. Before exit the work_function | |
must overwrite this buffer with the contents it wants to communicate | |
back to the master process [see I/O buffer.] | |
I/O buffer: | |
The buffers used for communication have some restrictions as to what can be | |
placed in them. The buffers can be no larger than MAX_MESSAGE and the | |
contents must fit on only one line that is ended by a single '\n' character | |
and follwed by the '\0' character. */ | |
void manager_init(struct manager *manager, int num_workers, | |
void (*input_callback)(char *), | |
void (*output_callback)(char *), | |
void (*work_function)(char *)); | |
/* Spawns the number of workers as indicated by manager->num_workers, runs | |
the jobs, and then stops the workers. | |
Returns the number of jobs that were completed or -1 in the event that no | |
jobs were run. | |
This function returns when the workers have stopped which happens in one of | |
two cases: | |
1) All the jobs have completed. | |
2) SIGINT was sent while the jobs were running. In this case, the workers | |
that have jobs will finish those jobs and then stop. | |
Note that while this function is running, SIGINT will not terminate the | |
program, rather it sets the flag to indicate no further scheduling of jobs. | |
*/ | |
int manager_run_jobs(struct manager *manager, int num_jobs); | |
/* Frees the manager object and restores the previous signal handler for | |
SIGINT. */ | |
void manager_destruct(struct manager *manager); | |
/* Used internally */ | |
/* Forks the workers, and performs cleanup to give the workers the appearance | |
that they were started independently. */ | |
struct worker *worker_create(struct manager *manager); | |
/* The simple worker run loop fetches data from the pipe, calls the | |
work_function, and then writes data back to the pipe. */ | |
void worker_run_loop(void (*func)(char *), int fd_in, int fd_out); | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <string.h> | |
#include "job_manager.h" | |
int job_num = 0; | |
/* This function is a worker function that does the work. On invocation, buffer | |
contains a single line containing the input for the job. When complete, | |
buffer should store the result of the job. The contents of the result must | |
fit on a single line that ends with a single '\n'. */ | |
void do_work(char *buffer) { | |
int job = atoi(buffer); | |
printf("%d started on job %d\n", getpid(), job); | |
sleep(3 + getpid() % 3); | |
snprintf(buffer, MAX_MESSAGE, "%d Success\n", job); | |
} | |
/* The message to send to the worker must be stored in the provided buffer of | |
size MAX_MESSAGE. The contents must fit on a single line that ends with a | |
single '\n' */ | |
void prepare_job(char *buffer) { | |
snprintf(buffer, MAX_MESSAGE, "%d\n", job_num++); | |
} | |
/* Process the single line result returned from the worker */ | |
void process_result(char *buffer) { | |
printf("%s", buffer); | |
} | |
int main(int argc, char *argv[]) { | |
int i, num_jobs; | |
struct manager manager; | |
if (argc != 3) { | |
printf("Usage: %s workers jobs\n", argv[0]); | |
return 1; | |
} | |
num_jobs = atoi(argv[2]); | |
manager_init(&manager, atoi(argv[1]), prepare_job, process_result, do_work); | |
i = manager_run_jobs(&manager, num_jobs); | |
manager_destruct(&manager); | |
printf("%d of %d jobs completed\n", i, num_jobs); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment