Skip to content

Instantly share code, notes, and snippets.

@mfukar
Last active July 25, 2024 13:26
Show Gist options
  • Save mfukar/3b6e2b72eefccbf27d8c4ddbfe2effee to your computer and use it in GitHub Desktop.
Save mfukar/3b6e2b72eefccbf27d8c4ddbfe2effee to your computer and use it in GitHub Desktop.
C11 code to have a POSIX multithreaded application which signal work completion via a condition variable, and then terminate.
/**
* Demonstration of a way to break out of a multi-threaded program with one "manager" thread and
* many "worker" threads.
* There is the assumption that the work being done is long-standing enough to guarantee that
* all threads have been created and are running prior to one of them completing its assigned work.
* If this is not the case, adjustments to this code have to be made. Better yet, implement and use
* a monitor / barrier.
* The primary use of this is to demonstrate usage of condition variable(s) and cancellation.
*
* IMPORTANT:
* The example showcases the case where ALL threads must stop when ONE completes its work. For
* example, when some kind of search is running and one match is enough, or when an external trigger
* is intended to force stoppage of all threads.
* In other use-cases, this code may not be appropriate.
*
* Build with:
* clang --std=c2x -O0 -o terminate-worker-threads terminate-worker-threads.c -lpthread
*/
#include <errno.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdatomic.h>
#define NUM_THREADS 10
pthread_mutex_t watcher_cv_mutex;
pthread_cond_t watcher_notification_cv;
/* Data race, even though will not cause race condition,
* so use an atomic type: */
atomic_bool are_we_finished = false;
void * worker (void * arg) {
/**
* Here we set the worker thread to be cancellable under any
* conditions.
* If instead the work requires certain sections be uninterrupted,
* e.g. to avoid inducing deadlocks,
* one needs to set cancellation type as PTHREAD_CANCEL_DEFERRED,
* and (ideally) explicitly set cancellation points.
* At those cancellation points, care must be taken that held mutexes
* must have the PTHREAD_MUTEX_ROBUST attribute and calls to
* pthread_mutex_lock handle the EOWNERDEAD return value.
*/
pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
unsigned worker_id = *(unsigned int *)arg;
volatile unsigned int loop = 0; /* `volatile` used to avoid loop optimisation;
this is purely for demonstration purposes. */
while (true) {
++loop;
/* At this point some "work" would be done */
;
pthread_mutex_lock (&watcher_cv_mutex);
if (loop > 5000000) { /* Simulate "work end" */
are_we_finished = true;
pthread_cond_broadcast (&watcher_notification_cv);
pthread_mutex_unlock (&watcher_cv_mutex);
pthread_exit (NULL);
}
pthread_mutex_unlock (&watcher_cv_mutex);
}
pthread_exit (NULL);
}
struct {
pthread_t handle;
unsigned int id;
} threads[NUM_THREADS] = {};
void * watcher (void * t) {
/* Lock mutex and wait for signal. */
pthread_mutex_lock (&watcher_cv_mutex);
/* Loop in case of spurious wake-ups.
* Retest in case the condition changes before the awakened thread
* re-acquires watcher_cv_mutex and returns from pthread_cond_wait.
*
* That doesn't happen in _this_ example. It could happen if the
* relationship between the threads was inverted, and this thread
* for example handed off work packages to the workers and notified
* them via a cv.
*/
while (!are_we_finished) {
pthread_cond_wait (&watcher_notification_cv, &watcher_cv_mutex);
}
printf ("watcher: woken up.\n");
pthread_mutex_unlock (&watcher_cv_mutex);
for (size_t i = 0; i < NUM_THREADS; ++i) {
if (pthread_cancel (threads[i].handle) == ESRCH) {
printf ("watcher: thread %u does not exist, could not cancel\n", threads[i].id);
} else {
printf ("watcher: cancelled %u\n", threads[i].id);
}
}
pthread_exit (NULL);
}
int main (int argc, char * argv[]) {
pthread_attr_t attr;
/* Initialize mutex and condition variable objects */
pthread_mutex_init (&watcher_cv_mutex, NULL);
pthread_cond_init (&watcher_notification_cv, NULL);
/* For portability, explicitly create threads in a joinable state */
pthread_attr_init (&attr);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
pthread_t s;
pthread_create (&s, &attr, watcher, NULL);
for (size_t i = 0; i < sizeof threads / sizeof *threads; ++i) {
threads[i].id = i;
if (!pthread_create (&threads[i].handle, &attr, worker, &threads[i].id)) {
printf ("worker %u created\n", threads[i].id);
}
}
/* Wait for all threads to complete. This will not wait forever because the watcher
* thread will cancel all of the workers:
*/
for (size_t i = 0; i < sizeof threads / sizeof *threads; ++i) {
pthread_join (threads[i].handle, NULL);
printf ("worker %u done\n", threads[i].id);
}
pthread_join (s, NULL);
printf ("watcher done\n");
/* Clean up and exit */
pthread_attr_destroy (&attr);
pthread_mutex_destroy (&watcher_cv_mutex);
pthread_cond_destroy (&watcher_notification_cv);
pthread_exit (NULL);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment