Created
June 29, 2019 13:45
-
-
Save x42/408cb5468914c7c3345f7071055d7502 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// gcc `pkg-config --cflags --libs glib-2.0` -pthread semproc.c -o semproc | |
/* Example to parallel progress a directed acyclic graph as described in | |
* Chapter 17.3 of http://www.theses.fr/2017PA080116, page 131-135 | |
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf | |
* | |
* (C) 2017, 2019 Robin Gareus <[email protected]> | |
* | |
* This program is free software; you can redistribute it and/or | |
* modify it under the terms of the GNU General Public License | |
* as published by the Free Software Foundation; either version 2 | |
* of the License, or (at your option) any later version. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU General Public License for more details. | |
* | |
* You should have received a copy of the GNU General Public License | |
* along with this program; if not, write to the Free Software | |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |
*/ | |
#define N_WORKERS (3) | |
#include <assert.h> | |
#include <glib.h> | |
#include <pthread.h> | |
#include <semaphore.h> | |
#include <stdbool.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <time.h> | |
struct _Graph; | |
/** A Process Graph Node to process */ | |
typedef struct _GraphNode { | |
struct _Graph* graph; ///< parent Graph | |
char* name; ///< node name | |
/** outgoing edges | |
* downstream nodes to activate when this node has completed processed | |
*/ | |
struct _GraphNode** childnodes; | |
size_t n_childnodes; | |
/* upstream nodes reference count */ | |
gint init_refcount; ///< number of incoming edges | |
volatile gint refcount; ///< count-down (unprocessed upstream) | |
} GraphNode; | |
/** Graph -- manage threads, schedule work and synchronize entry/exit */ | |
typedef struct _Graph { | |
/** List of all graph nodes (only used for memory management) */ | |
GraphNode** graph_nodes; | |
size_t n_graph_nodes; | |
/** Nodes without incoming edges. | |
* These run concurrently at the start of each cycle to kick off processing */ | |
GraphNode** init_trigger_list; | |
size_t n_init_triggers; | |
/* terminal node reference count */ | |
gint terminal_node_count; ///< number of graph nodes without an outgoing edge | |
volatile gint terminal_refcnt; ///< remaining unprocessed terminal nodes in this cycle | |
/* Synchronization with main process callback */ | |
sem_t callback_start; | |
sem_t callback_done; | |
/* wake up graph node process threads */ | |
sem_t trigger; | |
volatile bool terminate; ///< flag to exit, terminate all process-threads | |
/* these following are protected by _trigger_mutex */ | |
pthread_mutex_t trigger_mutex; | |
volatile int idle_thread_cnt; ///< number of threads waiting for work | |
GraphNode** trigger_queue; ///< nodes that can be processed | |
size_t n_trigger_queue; | |
size_t trigger_queue_size; ///< max size (pre-allocated array) | |
} Graph; | |
/* prototypes */ | |
void Graph_trigger (Graph* self, GraphNode* n); | |
void Graph_reached_terminal_node (Graph* self); | |
/* ************************** | |
* GraphNode Implementation * | |
* *************************/ | |
GraphNode* | |
GraphNode_new (Graph* g, char const* name) | |
{ | |
GraphNode* self = (GraphNode*)calloc (1, sizeof (GraphNode)); | |
self->graph = g; | |
self->init_refcount = 0; | |
self->refcount = 0; | |
self->childnodes = NULL; | |
self->n_childnodes = 0; | |
self->name = strdup (name); | |
return self; | |
} | |
void | |
GraphNode_free (GraphNode* self) | |
{ | |
free (self->childnodes); | |
free (self->name); | |
free (self); | |
} | |
/* called by an upstream node, when it has completed processing */ | |
void | |
GraphNode_trigger (GraphNode* self) | |
{ | |
/* check if we can run */ | |
if (g_atomic_int_dec_and_test (&self->refcount)) { | |
// printf ("* schedule '%s'\n", self->name); | |
/* reset reference count for next cycle */ | |
g_atomic_int_set (&self->refcount, self->init_refcount); | |
/* all nodes that feed this node have completed, so this node be processed now. */ | |
Graph_trigger (self->graph, self); | |
} | |
} | |
/* completed processing this node, notify outgoing edges */ | |
void | |
GraphNode_finish (GraphNode* self) | |
{ | |
bool feeds = false; | |
/* notify downstream nodes that depend on this node */ | |
for (size_t i = 0; i < self->n_childnodes; ++i) { | |
// printf ("- %s done activates %s\n", self->name, self->childnodes[i]->name); | |
GraphNode_trigger (self->childnodes[i]); | |
feeds = true; | |
} | |
/* if there are no outgoing edges, this is a terminal node */ | |
if (!feeds) { | |
/* notify parent graph */ | |
Graph_reached_terminal_node (self->graph); | |
} | |
} | |
/* fake "Process Node" - sleep some random time 1..10ms */ | |
void | |
GraphNode_process (GraphNode const* self) | |
{ | |
int dly = 1 + rand () % 10; | |
printf ("Thread 0x%zx: process '%s' (%dms)\n", pthread_self (), self->name, dly); | |
g_usleep (1000 * dly); | |
} | |
void | |
GraphNode_run (GraphNode* self) | |
{ | |
GraphNode_process (self); | |
GraphNode_finish (self); | |
} | |
/* setup method to add outgoing graph edges. | |
* Must be called during initialization only */ | |
void | |
GraphNode_add_feeds (GraphNode* self, GraphNode* g) | |
{ | |
for (size_t i = 0; i < self->n_childnodes; ++i) { | |
if (self->childnodes[i] == g) { | |
return; | |
} | |
} | |
self->childnodes = (GraphNode**)realloc (self->childnodes, (1 + self->n_childnodes) * sizeof (GraphNode*)); | |
self->childnodes[self->n_childnodes++] = g; | |
} | |
/* setup method, to add incoming graph edges | |
* Must be called during initialization only */ | |
void | |
GraphNode_add_depends (GraphNode* self) | |
{ | |
++self->init_refcount; | |
self->refcount = self->init_refcount; | |
} | |
/* ********************** | |
* Graph Implementation * | |
* *********************/ | |
void | |
Graph_init (Graph* self) | |
{ | |
sem_init (&self->callback_start, 0, 0); | |
sem_init (&self->callback_done, 0, 0); | |
sem_init (&self->trigger, 0, 0); | |
pthread_mutex_init (&self->trigger_mutex, NULL); | |
self->terminate = false; | |
self->idle_thread_cnt = 0; | |
self->terminal_refcnt = 0; | |
self->terminal_node_count = 0; | |
self->graph_nodes = NULL; | |
self->n_graph_nodes = 0; | |
self->init_trigger_list = NULL; | |
self->n_init_triggers = 0; | |
self->n_trigger_queue = 0; | |
self->trigger_queue_size = 0; | |
self->trigger_queue = NULL; | |
} | |
void | |
Graph_free (Graph* self) | |
{ | |
for (size_t n = 0; n < self->n_graph_nodes; ++n) { | |
GraphNode_free (self->graph_nodes[n]); | |
} | |
free (self->graph_nodes); | |
pthread_mutex_destroy (&self->trigger_mutex); | |
sem_destroy (&self->callback_start); | |
sem_destroy (&self->callback_done); | |
sem_destroy (&self->trigger); | |
} | |
#ifndef MIN | |
#define MIN(A, B) (((A) < (B)) ? (A) : (B)) | |
#endif | |
void* | |
Graph_worker_thread (void* g) | |
{ | |
Graph* self = (Graph*)g; | |
do { | |
GraphNode* to_run = NULL; | |
pthread_mutex_lock (&self->trigger_mutex); | |
if (self->terminate) { | |
pthread_mutex_unlock (&self->trigger_mutex); | |
return 0; | |
} | |
if (self->n_trigger_queue > 0) { | |
to_run = self->trigger_queue[--self->n_trigger_queue]; | |
} | |
int wakeup = MIN (self->idle_thread_cnt, self->n_trigger_queue); | |
/* wake up threads */ | |
self->idle_thread_cnt -= wakeup; | |
for (int i = 0; i < wakeup; i++) { | |
sem_post (&self->trigger); | |
} | |
while (!to_run) { | |
/* wait for work, fall asleep */ | |
self->idle_thread_cnt += 1; | |
assert (self->idle_thread_cnt <= N_WORKERS); | |
pthread_mutex_unlock (&self->trigger_mutex); | |
sem_wait (&self->trigger); | |
if (self->terminate) { | |
return 0; | |
} | |
/* try to find some work to do */ | |
pthread_mutex_lock (&self->trigger_mutex); | |
if (self->n_trigger_queue > 0) { | |
to_run = self->trigger_queue[--self->n_trigger_queue]; | |
} | |
} | |
pthread_mutex_unlock (&self->trigger_mutex); | |
/* process graph-node */ | |
GraphNode_run (to_run); | |
} while (!self->terminate); | |
return 0; | |
} | |
void* | |
Graph_main_thread (void* g) | |
{ | |
Graph* self = (Graph*)g; | |
/* wait for initial process callback */ | |
sem_wait (&self->callback_start); | |
pthread_mutex_lock (&self->trigger_mutex); | |
/* Can't run without a graph */ | |
assert (self->n_graph_nodes > 0); | |
assert (self->n_init_triggers > 0); | |
assert (self->terminal_node_count > 0); | |
/* bootstrap trigger-list. | |
* (later this is done by Graph_reached_terminal_node)*/ | |
for (size_t i = 0; i < self->n_init_triggers; ++i) { | |
assert (self->n_trigger_queue < self->trigger_queue_size); | |
self->trigger_queue[self->n_trigger_queue++] = self->init_trigger_list[i]; | |
} | |
pthread_mutex_unlock (&self->trigger_mutex); | |
/* after setup, the main-thread just becomes a normal worker */ | |
return Graph_worker_thread (g); | |
} | |
/* called from a node when all its incoming edges have | |
* completed processing and the node can run. | |
* It is added to the "work queue" */ | |
void | |
Graph_trigger (Graph* self, GraphNode* n) | |
{ | |
pthread_mutex_lock (&self->trigger_mutex); | |
assert (self->n_trigger_queue < self->trigger_queue_size); | |
self->trigger_queue[self->n_trigger_queue++] = n; | |
pthread_mutex_unlock (&self->trigger_mutex); | |
} | |
/* called from a terminal node (from the Graph worked-thread) | |
* to indicate it has completed processing. | |
* | |
* The thread of the last terminal node that reaches here | |
* will inform the main-thread, wait, and kick off the next process cycle. | |
*/ | |
void | |
Graph_reached_terminal_node (Graph* self) | |
{ | |
if (g_atomic_int_dec_and_test (&self->terminal_refcnt)) { | |
/* all terminal nodes have completed, | |
* we're done with this cycle. | |
*/ | |
sem_post (&self->callback_done); | |
/* now wait for the next cycle to begin */ | |
sem_wait (&self->callback_start); | |
if (self->terminate) { | |
return; | |
} | |
/* reset terminal reference count */ | |
g_atomic_int_set (&self->terminal_refcnt, self->terminal_node_count); | |
/* and start the initial nodes */ | |
pthread_mutex_lock (&self->trigger_mutex); | |
for (size_t i = 0; i < self->n_init_triggers; ++i) { | |
assert (self->n_trigger_queue < self->trigger_queue_size); | |
self->trigger_queue[self->n_trigger_queue++] = self->init_trigger_list[i]; | |
} | |
pthread_mutex_unlock (&self->trigger_mutex); | |
/* continue in worker-thread */ | |
} | |
} | |
/* tell all threads to terminate */ | |
void | |
Graph_terminate (Graph* self) | |
{ | |
pthread_mutex_lock (&self->trigger_mutex); | |
self->terminate = true; | |
self->init_trigger_list = NULL; | |
self->n_init_triggers = 0; | |
self->trigger_queue = NULL; | |
self->n_trigger_queue = 0; | |
free (self->init_trigger_list); | |
free (self->trigger_queue); | |
int tc = self->idle_thread_cnt; | |
assert (tc == N_WORKERS); | |
pthread_mutex_unlock (&self->trigger_mutex); | |
/* wake-up sleeping threads */ | |
for (int i = 0; i < tc; ++i) { | |
sem_post (&self->trigger); | |
} | |
/* and the main thread */ | |
pthread_mutex_lock (&self->trigger_mutex); | |
sem_post (&self->callback_start); | |
pthread_mutex_unlock (&self->trigger_mutex); | |
} | |
/* the actual entry-point to start processing all nodes | |
* and wait for them to complete */ | |
void | |
Graph_process_graph (Graph* self) | |
{ | |
printf (" -- START PROCESS --\n"); | |
const int64_t start = g_get_monotonic_time (); | |
sem_post (&self->callback_start); | |
sem_wait (&self->callback_done); | |
const int64_t end = g_get_monotonic_time (); | |
printf (" -- END PROCESS - ELAPSED: %.1fms\n", (end - start) / 1e3f); | |
} | |
/* ************************************************ | |
* setup, create and connect nodes | |
* These must only be called during initialization | |
* ***********************************************/ | |
GraphNode* | |
Graph_add_node (Graph* self, const char* name) | |
{ | |
free (self->trigger_queue); | |
self->trigger_queue = (GraphNode**)malloc (++self->trigger_queue_size * sizeof (GraphNode*)); | |
self->graph_nodes = (GraphNode**)realloc (self->graph_nodes, (1 + self->n_graph_nodes) * sizeof (GraphNode*)); | |
self->graph_nodes[self->n_graph_nodes] = GraphNode_new (self, name); | |
return self->graph_nodes[self->n_graph_nodes++]; | |
} | |
GraphNode* | |
Graph_add_terminal_node (Graph* self, const char* name) | |
{ | |
++self->terminal_node_count; | |
self->terminal_refcnt = self->terminal_node_count; | |
return Graph_add_node (self, name); | |
} | |
GraphNode* | |
Graph_add_initial_node (Graph* self, const char* name) | |
{ | |
self->init_trigger_list = (GraphNode**)realloc (self->init_trigger_list, (1 + self->n_init_triggers) * sizeof (GraphNode*)); | |
self->init_trigger_list[self->n_init_triggers] = Graph_add_node (self, name); | |
return self->init_trigger_list[self->n_init_triggers++]; | |
} | |
static void | |
connect (GraphNode* from, GraphNode* to) | |
{ | |
GraphNode_add_feeds (from, to); | |
GraphNode_add_depends (to); | |
} | |
/* ****************************************************************************/ | |
static void | |
setup_graph (Graph* g) | |
{ | |
#if 1 | |
/* Create some example graph | |
* | |
* [master-out] | |
* ^ ^ | |
* [bus-1] [bus-2] | |
* ^ ^ ^ | |
* [T1] [T2] [T3] | |
* | |
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf | |
* Fig. 31, Table 7; page 134, 135 | |
*/ | |
GraphNode* master = Graph_add_terminal_node (g, "master"); | |
GraphNode* bus2 = Graph_add_node (g, "bus2"); | |
GraphNode* bus1 = Graph_add_node (g, "bus1"); | |
GraphNode* track3 = Graph_add_initial_node (g, "track3"); | |
GraphNode* track2 = Graph_add_initial_node (g, "track2"); | |
GraphNode* track1 = Graph_add_initial_node (g, "track1"); | |
connect (bus1, master); | |
connect (bus2, master); | |
connect (track1, bus1); | |
connect (track2, bus2); | |
connect (track3, bus2); | |
#else | |
/* some more complex graph | |
* [out-1] [out-2] | |
* / | \ / \ | |
* [L1-1] [L1-2] [L1-3] [L1-4] [L1-5] | |
* / \ | \ | / \ / | |
* [L2-1] [L2-2] [L2-3 ] [L2-4] | |
* | | \ | / \ | |
* [L3-1] [L3-2] [L3-3] [L3-4] [L3-5] | |
* | |
*/ | |
GraphNode* out1 = Graph_add_terminal_node (g, "out-1"); | |
GraphNode* out2 = Graph_add_terminal_node (g, "out-2"); | |
GraphNode* l1_1 = Graph_add_node (g, "L1-1"); | |
GraphNode* l1_2 = Graph_add_node (g, "L1-2"); | |
GraphNode* l1_3 = Graph_add_node (g, "L1-3"); | |
GraphNode* l1_4 = Graph_add_node (g, "L1-4"); | |
GraphNode* l1_5 = Graph_add_node (g, "L1-5"); | |
GraphNode* l2_1 = Graph_add_node (g, "L2-1"); | |
GraphNode* l2_2 = Graph_add_node (g, "L2-2"); | |
GraphNode* l2_3 = Graph_add_node (g, "L2-3"); | |
GraphNode* l2_4 = Graph_add_node (g, "L2-4"); | |
GraphNode* l3_1 = Graph_add_initial_node (g, "L3-1"); | |
GraphNode* l3_2 = Graph_add_initial_node (g, "L3-2"); | |
GraphNode* l3_3 = Graph_add_initial_node (g, "L3-3"); | |
GraphNode* l3_4 = Graph_add_initial_node (g, "L3-4"); | |
GraphNode* l3_5 = Graph_add_initial_node (g, "L3-5"); | |
connect (l1_1, out1); | |
connect (l1_2, out1); | |
connect (l1_3, out1); | |
connect (l1_4, out2); | |
connect (l1_5, out2); | |
connect (l2_1, l1_1); | |
connect (l2_2, l1_2); | |
connect (l2_3, l1_2); | |
connect (l2_3, l1_3); | |
connect (l2_3, l1_4); | |
connect (l2_4, l1_4); | |
connect (l2_4, l1_5); | |
connect (l3_1, l2_1); | |
connect (l3_2, l2_2); | |
connect (l3_3, l2_2); | |
connect (l3_4, l2_4); | |
connect (l3_5, l2_4); | |
connect (l3_3, l2_3); | |
#endif | |
} | |
int | |
main (int argc, char** argv) | |
{ | |
/* Number of worker threads to start (in addition to main thread). Can be 0) */ | |
srand (time (NULL)); | |
Graph g; | |
pthread_t main_thread; | |
pthread_t workers[N_WORKERS]; | |
/* initialize graph, and add some nodes to process */ | |
Graph_init (&g); | |
setup_graph (&g); | |
/* create worker threads */ | |
for (int i = 0; i < N_WORKERS; ++i) { | |
pthread_create (&workers[i], NULL, &Graph_worker_thread, &g); | |
} | |
/* and the main thread */ | |
pthread_create (&main_thread, NULL, &Graph_main_thread, &g); | |
/* breathe */ | |
sched_yield (); | |
/* process a few times */ | |
for (int i = 0; i < 5; ++i) { | |
Graph_process_graph (&g); | |
} | |
/* cleanup and quit */ | |
Graph_terminate (&g); | |
for (int i = 0; i < N_WORKERS; ++i) { | |
pthread_join (workers[i], NULL); | |
} | |
pthread_join (main_thread, NULL); | |
Graph_free (&g); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment