Skip to content

Instantly share code, notes, and snippets.

@bboe
Created February 23, 2012 20:15
Show Gist options
  • Save bboe/1894848 to your computer and use it in GitHub Desktop.
Save bboe/1894848 to your computer and use it in GitHub Desktop.
Process-based Job Management in C
#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/wait.h>
#include "job_manager.h"
volatile sig_atomic_t multiprocess_running = 0;
void multiprocess_stop(int sig) {
multiprocess_running = 0;
signal(sig, multiprocess_stop);
}
void manager_init(struct manager *manager, int num_workers,
void (*input_callback)(char *),
void(*output_callback)(char *),
void (*work_function)(char *)) {
manager->num_workers = num_workers;
manager->workers = NULL;
manager->input_callback = input_callback;
manager->output_callback = output_callback;
manager->work_function = work_function;
}
int manager_run_jobs(struct manager *manager, int num_jobs) {
char buf[MAX_MESSAGE];
fd_set read_set;
int i, completed=0, sent=0;
void *prev_sigint;
struct worker *cur;
/* Lessen the number of workers if they aren't needed */
if (num_jobs < manager->num_workers) {
manager->num_workers = num_jobs;
manager->max_fd = 4 + 2 * manager->num_workers;
}
/* Sanity check on the number of workers */
if (manager->num_workers < 1)
return -1;
/* Perform additional setup */
manager->max_fd = 4 + 2 * manager->num_workers;
manager->workers = malloc(sizeof(struct worker*) * manager->num_workers);
memset(manager->workers, 0, sizeof(struct worker*) * manager->num_workers);
manager->fd_to_file = malloc(sizeof(FILE *) * manager->max_fd);
manager->fd_to_worker = malloc(sizeof(int) * manager->max_fd);
/* Start the workers */
for(i = 0; i < manager->num_workers; ++i) {
cur = worker_create(manager);
manager->fd_to_file[cur->input[1]] = fdopen(cur->input[1], "w");
manager->fd_to_file[cur->output[0]] = fdopen(cur->output[0], "r");
manager->fd_to_worker[cur->output[0]] = i;
manager->workers[i] = cur;
}
multiprocess_running = 1;
prev_sigint = signal(SIGINT, multiprocess_stop);
while (multiprocess_running && completed != num_jobs ||
!multiprocess_running && completed != sent) {
/* Build file descriptor set and send jobs to non-busy workers */
FD_ZERO(&read_set);
for(i = 0; i < manager->num_workers; ++i) {
if (multiprocess_running && !manager->workers[i]->busy &&
sent < num_jobs) {
manager->input_callback(buf);
fprintf(manager->fd_to_file[manager->workers[i]->input[1]], "%s", buf);
fflush(manager->fd_to_file[manager->workers[i]->input[1]]);
manager->workers[i]->busy = 1;
++sent;
}
if (manager->workers[i]->busy)
FD_SET(manager->workers[i]->output[0], &read_set);
}
/* Wait until there are results */
if ((i = select(manager->max_fd + 1, &read_set, NULL, NULL, NULL)) < 0) {
if (errno == EINTR)
continue;
perror("Select error"); exit(1);
}
/* Fetch results */
for(i = 0; i < manager->max_fd; ++i) {
if (FD_ISSET(i, &read_set)) {
fgets(buf, MAX_MESSAGE, manager->fd_to_file[i]);
manager->output_callback(buf);
manager->workers[manager->fd_to_worker[i]]->busy = 0;
++completed;
}
}
}
/* Wait on all the workers */
for(i = 0; i < manager->num_workers; ++i) {
fclose(manager->fd_to_file[manager->workers[i]->input[1]]);
fclose(manager->fd_to_file[manager->workers[i]->output[0]]);
waitpid(manager->workers[i]->pid, NULL, 0);
free(manager->workers[i]);
manager->workers[i] = NULL;
}
signal(SIGINT, prev_sigint);
return completed;
}
void manager_destruct(struct manager *manager) {
int i;
if (manager->workers == NULL) /* manager_run_jobs was never called */
return;
for (i = 0; i < manager->num_workers; ++i)
if (manager->workers[i] != NULL) {
fclose(manager->fd_to_file[manager->workers[i]->input[1]]);
fclose(manager->fd_to_file[manager->workers[i]->output[0]]);
free(manager->workers[i]);
}
free(manager->workers);
free(manager->fd_to_file);
free(manager->fd_to_worker);
}
struct worker *worker_create(struct manager *manager) {
struct worker *worker = malloc(sizeof(struct worker));
void (*func)(char *) = manager->work_function;
worker->busy = 0;
if (pipe(worker->input)) {
perror("Worker input pipe creation error"); exit(1);
}
if (pipe(worker->output)) {
perror("Worker output pipe creation error"); exit(1);
}
if ((worker->pid = fork()) == 0) {
signal(SIGINT, SIG_IGN);
manager_destruct(manager);
if (dup2(worker->input[0], STDIN_FILENO) != STDIN_FILENO) {
perror("Error moving input descriptor"); exit(1);
}
if (dup2(worker->output[1], 3) != 3) {
perror("Error moving output descriptor"); exit(1);
}
if (worker->input[0] != 3)
close(worker->input[0]);
close(worker->input[1]);
close(worker->output[0]);
close(worker->output[1]);
free(worker);
worker_run_loop(func, STDIN_FILENO, 3);
exit(0);
}
close(worker->input[0]);
close(worker->output[1]);
return worker;
}
void worker_run_loop(void (*func)(char *), int in_fd, int out_fd) {
char buf[MAX_MESSAGE];
FILE *input = fdopen(in_fd, "r");
FILE *output = fdopen(out_fd, "w");
while (fgets(buf, MAX_MESSAGE, input) != NULL) {
func(buf);
fprintf(output, "%s", buf);
fflush(output);
}
fclose(input);
fclose(output);
}
#ifndef JOB_MANAGER
#define JOB_MANAGER
#include <stdio.h>
#include <unistd.h>
#define MAX_MESSAGE 1024
struct worker {
int input[2];
int output[2];
int busy;
pid_t pid;
};
struct manager {
FILE **fd_to_file;
int *fd_to_worker;
int max_fd;
int num_workers;
struct worker **workers;
void (*input_callback)(char *);
void (*output_callback)(char *);
void (*work_function)(char *);
};
/* Initialize the manager structure.
Parameters:
manager: The address of an already allocated manager structure
num_workers: The number of workers that this job_manager will spawn.
input_callback: This function is called just before sending a job to a
worker. Its single parameter is a buffer whose contents will be
communicated to the worker process [see I/O buffer].
output_callback: This function is called after a worker has completed a
job. Its single parameter is a buffer whose contents directly represent
the results from the work_function output [see I/O buffer.]
work_function: This function is what is called by each worker when a job
is ready for processing. Its single parameter is a buffer that is used
both for input and output. On invocation, the buffer contains the output
from one of the input_callback functions. Before exit the work_function
must overwrite this buffer with the contents it wants to communicate
back to the master process [see I/O buffer.]
I/O buffer:
The buffers used for communication have some restrictions as to what can be
placed in them. The buffers can be no larger than MAX_MESSAGE and the
contents must fit on only one line that is ended by a single '\n' character
and follwed by the '\0' character. */
void manager_init(struct manager *manager, int num_workers,
void (*input_callback)(char *),
void (*output_callback)(char *),
void (*work_function)(char *));
/* Spawns the number of workers as indicated by manager->num_workers, runs
the jobs, and then stops the workers.
Returns the number of jobs that were completed or -1 in the event that no
jobs were run.
This function returns when the workers have stopped which happens in one of
two cases:
1) All the jobs have completed.
2) SIGINT was sent while the jobs were running. In this case, the workers
that have jobs will finish those jobs and then stop.
Note that while this function is running, SIGINT will not terminate the
program, rather it sets the flag to indicate no further scheduling of jobs.
*/
int manager_run_jobs(struct manager *manager, int num_jobs);
/* Frees the manager object and restores the previous signal handler for
SIGINT. */
void manager_destruct(struct manager *manager);
/* Used internally */
/* Forks the workers, and performs cleanup to give the workers the appearance
that they were started independently. */
struct worker *worker_create(struct manager *manager);
/* The simple worker run loop fetches data from the pipe, calls the
work_function, and then writes data back to the pipe. */
void worker_run_loop(void (*func)(char *), int fd_in, int fd_out);
#endif
#include <stdio.h>
#include <string.h>
#include "job_manager.h"
int job_num = 0;
/* This function is a worker function that does the work. On invocation, buffer
contains a single line containing the input for the job. When complete,
buffer should store the result of the job. The contents of the result must
fit on a single line that ends with a single '\n'. */
void do_work(char *buffer) {
int job = atoi(buffer);
printf("%d started on job %d\n", getpid(), job);
sleep(3 + getpid() % 3);
snprintf(buffer, MAX_MESSAGE, "%d Success\n", job);
}
/* The message to send to the worker must be stored in the provided buffer of
size MAX_MESSAGE. The contents must fit on a single line that ends with a
single '\n' */
void prepare_job(char *buffer) {
snprintf(buffer, MAX_MESSAGE, "%d\n", job_num++);
}
/* Process the single line result returned from the worker */
void process_result(char *buffer) {
printf("%s", buffer);
}
int main(int argc, char *argv[]) {
int i, num_jobs;
struct manager manager;
if (argc != 3) {
printf("Usage: %s workers jobs\n", argv[0]);
return 1;
}
num_jobs = atoi(argv[2]);
manager_init(&manager, atoi(argv[1]), prepare_job, process_result, do_work);
i = manager_run_jobs(&manager, num_jobs);
manager_destruct(&manager);
printf("%d of %d jobs completed\n", i, num_jobs);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment