Last active
July 25, 2024 13:26
-
-
Save mfukar/3b6e2b72eefccbf27d8c4ddbfe2effee to your computer and use it in GitHub Desktop.
C11 code to have a POSIX multithreaded application which signal work completion via a condition variable, and then terminate.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Demonstration of a way to break out of a multi-threaded program with one "manager" thread and | |
* many "worker" threads. | |
* There is the assumption that the work being done is long-standing enough to guarantee that | |
* all threads have been created and are running prior to one of them completing its assigned work. | |
* If this is not the case, adjustments to this code have to be made. Better yet, implement and use | |
* a monitor / barrier. | |
* The primary use of this is to demonstrate usage of condition variable(s) and cancellation. | |
* | |
* IMPORTANT: | |
* The example showcases the case where ALL threads must stop when ONE completes its work. For | |
* example, when some kind of search is running and one match is enough, or when an external trigger | |
* is intended to force stoppage of all threads. | |
* In other use-cases, this code may not be appropriate. | |
* | |
* Build with: | |
* clang --std=c2x -O0 -o terminate-worker-threads terminate-worker-threads.c -lpthread | |
*/ | |
#include <errno.h> | |
#include <pthread.h> | |
#include <stdbool.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <stdatomic.h> | |
#define NUM_THREADS 10 | |
pthread_mutex_t watcher_cv_mutex; | |
pthread_cond_t watcher_notification_cv; | |
/* Data race, even though will not cause race condition, | |
* so use an atomic type: */ | |
atomic_bool are_we_finished = false; | |
void * worker (void * arg) { | |
/** | |
* Here we set the worker thread to be cancellable under any | |
* conditions. | |
* If instead the work requires certain sections be uninterrupted, | |
* e.g. to avoid inducing deadlocks, | |
* one needs to set cancellation type as PTHREAD_CANCEL_DEFERRED, | |
* and (ideally) explicitly set cancellation points. | |
* At those cancellation points, care must be taken that held mutexes | |
* must have the PTHREAD_MUTEX_ROBUST attribute and calls to | |
* pthread_mutex_lock handle the EOWNERDEAD return value. | |
*/ | |
pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); | |
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL); | |
unsigned worker_id = *(unsigned int *)arg; | |
volatile unsigned int loop = 0; /* `volatile` used to avoid loop optimisation; | |
this is purely for demonstration purposes. */ | |
while (true) { | |
++loop; | |
/* At this point some "work" would be done */ | |
; | |
pthread_mutex_lock (&watcher_cv_mutex); | |
if (loop > 5000000) { /* Simulate "work end" */ | |
are_we_finished = true; | |
pthread_cond_broadcast (&watcher_notification_cv); | |
pthread_mutex_unlock (&watcher_cv_mutex); | |
pthread_exit (NULL); | |
} | |
pthread_mutex_unlock (&watcher_cv_mutex); | |
} | |
pthread_exit (NULL); | |
} | |
struct { | |
pthread_t handle; | |
unsigned int id; | |
} threads[NUM_THREADS] = {}; | |
void * watcher (void * t) { | |
/* Lock mutex and wait for signal. */ | |
pthread_mutex_lock (&watcher_cv_mutex); | |
/* Loop in case of spurious wake-ups. | |
* Retest in case the condition changes before the awakened thread | |
* re-acquires watcher_cv_mutex and returns from pthread_cond_wait. | |
* | |
* That doesn't happen in _this_ example. It could happen if the | |
* relationship between the threads was inverted, and this thread | |
* for example handed off work packages to the workers and notified | |
* them via a cv. | |
*/ | |
while (!are_we_finished) { | |
pthread_cond_wait (&watcher_notification_cv, &watcher_cv_mutex); | |
} | |
printf ("watcher: woken up.\n"); | |
pthread_mutex_unlock (&watcher_cv_mutex); | |
for (size_t i = 0; i < NUM_THREADS; ++i) { | |
if (pthread_cancel (threads[i].handle) == ESRCH) { | |
printf ("watcher: thread %u does not exist, could not cancel\n", threads[i].id); | |
} else { | |
printf ("watcher: cancelled %u\n", threads[i].id); | |
} | |
} | |
pthread_exit (NULL); | |
} | |
int main (int argc, char * argv[]) { | |
pthread_attr_t attr; | |
/* Initialize mutex and condition variable objects */ | |
pthread_mutex_init (&watcher_cv_mutex, NULL); | |
pthread_cond_init (&watcher_notification_cv, NULL); | |
/* For portability, explicitly create threads in a joinable state */ | |
pthread_attr_init (&attr); | |
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE); | |
pthread_t s; | |
pthread_create (&s, &attr, watcher, NULL); | |
for (size_t i = 0; i < sizeof threads / sizeof *threads; ++i) { | |
threads[i].id = i; | |
if (!pthread_create (&threads[i].handle, &attr, worker, &threads[i].id)) { | |
printf ("worker %u created\n", threads[i].id); | |
} | |
} | |
/* Wait for all threads to complete. This will not wait forever because the watcher | |
* thread will cancel all of the workers: | |
*/ | |
for (size_t i = 0; i < sizeof threads / sizeof *threads; ++i) { | |
pthread_join (threads[i].handle, NULL); | |
printf ("worker %u done\n", threads[i].id); | |
} | |
pthread_join (s, NULL); | |
printf ("watcher done\n"); | |
/* Clean up and exit */ | |
pthread_attr_destroy (&attr); | |
pthread_mutex_destroy (&watcher_cv_mutex); | |
pthread_cond_destroy (&watcher_notification_cv); | |
pthread_exit (NULL); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment