Last active
July 6, 2019 17:22
-
-
Save x42/9aa5e737a1479bafb7f1bb96f7c64dc0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// g++ `pkg-config --cflags --libs glib-2.0` -pthread semproc++.cc -o semproc | |
/* Example to parallel progress a directed acyclic graph as described in | |
* Chapter 17.3 of http://www.theses.fr/2017PA080116, page 131-135 | |
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf | |
* | |
* (C) 2017, 2019 Robin Gareus <[email protected]> | |
* | |
* This program is free software; you can redistribute it and/or | |
* modify it under the terms of the GNU General Public License | |
* as published by the Free Software Foundation; either version 2 | |
* of the License, or (at your option) any later version. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU General Public License for more details. | |
* | |
* You should have received a copy of the GNU General Public License | |
* along with this program; if not, write to the Free Software | |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |
*/ | |
/* Number of worker threads to start (in addition to main thread). Can be 0. | |
* This should be user configurable and set to "available CPU cores - 1" | |
*/ | |
#define N_WORKERS (3) | |
#include <cassert> | |
#include <cstdio> | |
#include <glib.h> | |
#include <pthread.h> | |
#include <semaphore.h> | |
#include <stdint.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include <list> | |
#include <set> | |
#include <string> | |
#include <vector> | |
/** Lock free multiple producer, multiple consumer queue | |
* | |
* inspired by http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue | |
* Kudos to Dmitry Vyukov | |
*/ | |
template <typename T> | |
class MPMCQueue | |
{ | |
public: | |
MPMCQueue (size_t buffer_size = 8) | |
: _buffer (0) | |
, _buffer_mask (0) | |
{ | |
reserve (buffer_size); | |
} | |
~MPMCQueue () | |
{ | |
delete[] _buffer; | |
} | |
static size_t | |
power_of_two_size (size_t sz) | |
{ | |
int32_t power_of_two; | |
for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two) | |
; | |
return 1U << power_of_two; | |
} | |
void | |
reserve (size_t buffer_size) | |
{ | |
buffer_size = power_of_two_size (buffer_size); | |
assert ((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)); | |
if (_buffer_mask >= buffer_size - 1) { | |
return; | |
} | |
delete[] _buffer; | |
_buffer = new cell_t[buffer_size]; | |
_buffer_mask = buffer_size - 1; | |
clear (); | |
} | |
void | |
clear () | |
{ | |
for (size_t i = 0; i <= _buffer_mask; ++i) { | |
g_atomic_int_set (&_buffer[i]._sequence, i); | |
} | |
g_atomic_int_set (&_enqueue_pos, 0); | |
g_atomic_int_set (&_dequeue_pos, 0); | |
} | |
bool | |
push_back (T const& data) | |
{ | |
cell_t* cell; | |
guint pos = g_atomic_int_get (&_enqueue_pos); | |
for (;;) { | |
cell = &_buffer[pos & _buffer_mask]; | |
guint seq = g_atomic_int_get (&cell->_sequence); | |
intptr_t dif = (intptr_t)seq - (intptr_t)pos; | |
if (dif == 0) { | |
if (g_atomic_int_compare_and_exchange (&_enqueue_pos, pos, pos + 1)) { | |
break; | |
} | |
} else if (dif < 0) { | |
assert (0); | |
return false; | |
} else { | |
pos = g_atomic_int_get (&_enqueue_pos); | |
} | |
} | |
cell->_data = data; | |
g_atomic_int_set (&cell->_sequence, pos + 1); | |
return true; | |
} | |
bool | |
dequeue (T& data) | |
{ | |
cell_t* cell; | |
guint pos = g_atomic_int_get (&_dequeue_pos); | |
for (;;) { | |
cell = &_buffer[pos & _buffer_mask]; | |
guint seq = g_atomic_int_get (&cell->_sequence); | |
intptr_t dif = (intptr_t)seq - (intptr_t) (pos + 1); | |
if (dif == 0) { | |
if (g_atomic_int_compare_and_exchange (&_dequeue_pos, pos, pos + 1)) { | |
break; | |
} | |
} else if (dif < 0) { | |
return false; | |
} else { | |
pos = g_atomic_int_get (&_dequeue_pos); | |
} | |
} | |
data = cell->_data; | |
g_atomic_int_set (&cell->_sequence, pos + _buffer_mask + 1); | |
return true; | |
} | |
private: | |
struct cell_t { | |
volatile guint _sequence; | |
T _data; | |
}; | |
cell_t* _buffer; | |
size_t _buffer_mask; | |
volatile guint _enqueue_pos; | |
volatile guint _dequeue_pos; | |
}; | |
class Graph; | |
/** A Process Graph Node to process */ | |
class GraphNode { | |
public: | |
GraphNode (Graph* g, std::string const& n) | |
: _graph (g) | |
, _name (n) | |
, _refcount (0) | |
, _init_refcount (0) | |
{ | |
/* fake some work */ | |
_work_delay = 1 + rand () % 10; | |
} | |
void run () | |
{ | |
process (); | |
finish (); | |
} | |
const char* cname () const { return _name.c_str (); } | |
/* setup methods, to add graph edges. | |
* Must be called during initialization only. | |
* | |
* Note that this example does not include toplogical sorting. | |
* It is left to the graph-setup to order the DAG. | |
*/ | |
/** add outgoing graph edge */ | |
void add_feeds (GraphNode* g) | |
{ | |
_childnodes.insert (g); | |
} | |
/** add incoming graph edge */ | |
void add_depends () | |
{ | |
++_init_refcount; | |
_refcount = _init_refcount; | |
} | |
private: | |
/* custom data to process this node. | |
* Here a sleep/delay in msec. | |
*/ | |
gulong _work_delay; | |
protected: | |
virtual void process (); | |
private: | |
void finish (); | |
void trigger (); | |
Graph* _graph; ///< parent Graph | |
std::string _name; ///< node name | |
/** outgoing edges | |
* downstream nodes to activate when this node has completed processed */ | |
std::set<GraphNode*> _childnodes; | |
/* upstream nodes reference count */ | |
gint _refcount; ///< count-down (unprocessed upstream) | |
gint _init_refcount; ///< number of incoming edges | |
}; | |
/** Graph -- manage threads, schedule work and synchronize entry/exit */ | |
class Graph { | |
public: | |
Graph (unsigned int n_worker_threads) | |
: _n_workers (n_worker_threads) | |
, _n_terminal_nodes (0) | |
, _setup_n_terminal_nodes (0) | |
{ | |
sem_init (&_callback_start, 0, 0); | |
sem_init (&_callback_done, 0, 0); | |
sem_init (&_trigger, 0, 0); | |
g_atomic_int_set (&_terminal_refcnt, 0); | |
g_atomic_int_set (&_terminate, 0); | |
g_atomic_int_set (&_idle_thread_cnt, 0); | |
g_atomic_int_set (&_trigger_queue_size, 0); | |
} | |
~Graph () | |
{ | |
sem_destroy (&_callback_start); | |
sem_destroy (&_callback_done); | |
sem_destroy (&_trigger); | |
} | |
void process_graph (); | |
bool start (); | |
void terminate (); | |
/* setup, create and connect nodes */ | |
GraphNode* | |
add_node (std::string const& name) | |
{ | |
_setup_graph_nodes.push_back (GraphNode (this, name)); | |
return &_setup_graph_nodes.back (); | |
} | |
GraphNode* | |
add_terminal_node (std::string const& name) | |
{ | |
++_setup_n_terminal_nodes; | |
return add_node (name); | |
} | |
GraphNode* | |
add_initial_node (std::string const& name) | |
{ | |
GraphNode* n = add_node (name); | |
_setup_init_trigger_list.push_back (n); | |
return n; | |
} | |
void clear_setup () | |
{ | |
_setup_graph_nodes.clear (); | |
_setup_init_trigger_list.clear (); | |
_setup_n_terminal_nodes = 0; | |
} | |
void rechain () | |
{ | |
assert (g_atomic_int_get (&_terminal_refcnt) == 0); | |
assert (g_atomic_int_get (&_trigger_queue_size) == 0); | |
_graph_nodes.swap (_setup_graph_nodes); | |
_init_trigger_list.swap (_setup_init_trigger_list); | |
_n_terminal_nodes = _setup_n_terminal_nodes; | |
g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes); | |
_trigger_queue.reserve (_graph_nodes.size ()); | |
clear_setup (); | |
} | |
static void | |
connect (GraphNode* from, GraphNode* to) | |
{ | |
from->add_feeds (to); | |
to->add_depends (); | |
} | |
private: | |
friend class GraphNode; | |
void trigger (GraphNode* n); | |
void reached_terminal_node (); | |
private: | |
static void* | |
start_main_thread (void* g) | |
{ | |
static_cast<Graph*> (g)->main_thread (); | |
return NULL; | |
} | |
static void* | |
start_worker (void* g) | |
{ | |
static_cast<Graph*> (g)->worker_thread (); | |
return NULL; | |
} | |
void worker_thread (); | |
void main_thread (); | |
pthread_t _main_thread; | |
std::vector<pthread_t> _worker_threads; | |
/* number of work-threads to start/started. | |
* After ::start() this is equivalent to | |
* _worker_threads.size () | |
*/ | |
int _n_workers; | |
/* flag to terminate background threads */ | |
volatile gint _terminate; | |
/** List of all graph nodes (only used for memory management) */ | |
std::list<GraphNode> _graph_nodes; | |
/** Nodes without incoming edges. | |
* These run concurrently at the start of each cycle to kick off processing */ | |
std::list<GraphNode*> _init_trigger_list; | |
/** Synchronize with entry point */ | |
sem_t _callback_start; | |
sem_t _callback_done; | |
/** Start worker threads */ | |
sem_t _trigger; | |
/* Terminal node reference count */ | |
volatile gint _terminal_refcnt; ///< remaining unprocessed terminal nodes in this cycle | |
size_t _n_terminal_nodes; ///< number of graph nodes without an outgoing edge | |
MPMCQueue<GraphNode*> _trigger_queue; ///< nodes that can be processed | |
volatile guint _trigger_queue_size; ///< number of entries in trigger-queue | |
volatile guint _idle_thread_cnt; ///< number of threads waiting for work | |
/* Chain used to setup in the background. | |
* This is applied and cleared by ::rechain() | |
*/ | |
std::list<GraphNode> _setup_graph_nodes; | |
std::list<GraphNode*> _setup_init_trigger_list; | |
size_t _setup_n_terminal_nodes; | |
}; | |
/* ************************** | |
* GraphNode Implementation */ | |
/* Process a node */ | |
void | |
GraphNode::process () | |
{ | |
/* XXX the actual work is to be performed here XXX */ | |
printf ("Thread 0x%zx: process '%s' (%lums)\n", pthread_self (), cname (), _work_delay); | |
g_usleep (1000 * _work_delay); | |
} | |
/* Called by an upstream node, when it has completed processing */ | |
void | |
GraphNode::trigger () | |
{ | |
/* check if we can run */ | |
if (g_atomic_int_dec_and_test (&_refcount)) { | |
/* reset reference count for next cycle */ | |
g_atomic_int_set (&_refcount, _init_refcount); | |
/* all nodes that feed this node have completed, so this node be processed now. */ | |
_graph->trigger (this); | |
} | |
} | |
/* Completed processing this node, notify outgoing edges */ | |
void | |
GraphNode::finish () | |
{ | |
bool feeds = false; | |
/* notify downstream nodes that depend on this node */ | |
for (auto i : _childnodes) { | |
// printf ("%s activates %s\n", cname(), i->cname()); | |
i->trigger (); | |
feeds = true; | |
} | |
if (!feeds) { | |
/* terminal node, notify graph */ | |
_graph->reached_terminal_node (); | |
} | |
} | |
/* ********************** | |
* Graph Implementation * | |
* *********************/ | |
void | |
Graph::main_thread () | |
{ | |
/* Wait until all worker threads are active */ | |
while (g_atomic_int_get (&_idle_thread_cnt) != _n_workers) { | |
sched_yield (); | |
} | |
/* Wait for initial process callback */ | |
sem_wait (&_callback_start); | |
/* first time setup */ | |
/* Can't run without a graph */ | |
assert (_graph_nodes.size () > 0); | |
assert (_init_trigger_list.size () > 0); | |
assert (_n_terminal_nodes > 0); | |
/* Bootstrap the trigger-list | |
* (later this is done by Graph_reached_terminal_node) */ | |
for (auto i : _init_trigger_list) { | |
g_atomic_int_inc (&_trigger_queue_size); | |
_trigger_queue.push_back (i); | |
} | |
/* After setup, the main-thread just becomes a normal worker */ | |
worker_thread (); | |
} | |
void | |
Graph::worker_thread () | |
{ | |
for (;;) { | |
GraphNode* to_run = NULL; | |
if (g_atomic_int_get (&_terminate)) { | |
return; | |
} | |
if (_trigger_queue.dequeue (to_run)) { | |
/* Wake up idle threads, but at most as many as there's | |
* work in the trigger queue that can be processed by | |
* other threads. | |
* This thread as not yet decreased _trigger_queue_size. | |
*/ | |
guint idle_cnt = g_atomic_int_get (&_idle_thread_cnt); | |
guint work_avail = g_atomic_int_get (&_trigger_queue_size); | |
guint wakeup = std::min (idle_cnt + 1, work_avail); | |
for (guint i = 1; i < wakeup; ++i) { | |
sem_post (&_trigger); | |
} | |
} | |
while (!to_run) { | |
/* Wait for work, fall asleep */ | |
g_atomic_int_inc (&_idle_thread_cnt); | |
assert (g_atomic_int_get (&_idle_thread_cnt) <= _n_workers); | |
sem_wait (&_trigger); | |
if (g_atomic_int_get (&_terminate)) { | |
return; | |
} | |
g_atomic_int_dec_and_test (&_idle_thread_cnt); | |
/* Try to find some work to do */ | |
_trigger_queue.dequeue (to_run); | |
} | |
/* Process the graph-node */ | |
g_atomic_int_dec_and_test (&_trigger_queue_size); | |
to_run->run (); | |
} | |
} | |
/* Called from a node when all its incoming edges have | |
* completed processing and the node can run. | |
* It is added to the "work queue" */ | |
void | |
Graph::trigger (GraphNode* n) | |
{ | |
g_atomic_int_inc (&_trigger_queue_size); | |
_trigger_queue.push_back (n); | |
} | |
/* Called from a terminal node (from the Graph worked-thread) | |
* to indicate it has completed processing. | |
* | |
* The thread of the last terminal node that reaches here | |
* will inform the main-thread, wait, and kick off the next process cycle. | |
*/ | |
void | |
Graph::reached_terminal_node () | |
{ | |
if (g_atomic_int_dec_and_test (&_terminal_refcnt)) { | |
/* All terminal nodes have completed. | |
* We're done with this cycle. | |
*/ | |
assert (g_atomic_int_get (&_trigger_queue_size) == 0); | |
/* Notify caller */ | |
sem_post (&_callback_done); | |
/* Ensure that all background threads are idle. | |
* When freewheeling there may be an immediate restart: | |
* If there are more threads than CPU cores, some worker- | |
* threads may only be "on the way" to become idle. | |
*/ | |
while (g_atomic_int_get (&_idle_thread_cnt) != _n_workers) { | |
sched_yield (); | |
} | |
/* Now wait for the next cycle to begin */ | |
sem_wait (&_callback_start); | |
if (g_atomic_int_get (&_terminate)) { | |
return; | |
} | |
/* Reset terminal reference count */ | |
g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes); | |
/* and start the initial nodes */ | |
for (auto i : _init_trigger_list) { | |
g_atomic_int_inc (&_trigger_queue_size); | |
_trigger_queue.push_back (i); | |
} | |
/* .. continue in worker-thread */ | |
} | |
} | |
bool | |
Graph::start () | |
{ | |
/* XXX These really should be realtime threads for audio processing XXX | |
* | |
* eg. jack_client_create_thread() or setting up priority using | |
* pthread_attr_setschedpolicy() pthread_attr_setschedparam() | |
* and/or mach thread_policy_set() | |
* | |
* alternatively expose Graph::start_worker(), | |
* Graph::start_main_thread() to the DAW's backend/engine. | |
*/ | |
/* Start worker threads */ | |
for (int i = 0; i < _n_workers; ++i) { | |
pthread_t tid; | |
if (0 == pthread_create (&tid, NULL, Graph::start_worker, this)) { | |
_worker_threads.push_back (tid); | |
} | |
} | |
_n_workers = _worker_threads.size (); | |
/* .. and the main thread */ | |
if (pthread_create (&_main_thread, NULL, Graph::start_main_thread, this)) { | |
terminate (); | |
return false; | |
} | |
/* Breathe */ | |
sched_yield (); | |
return true; | |
} | |
/* Tell all threads to terminate */ | |
void | |
Graph::terminate () | |
{ | |
/* Flag threads to terminate */ | |
g_atomic_int_set (&_terminate, 1); | |
/* Wake-up sleeping threads */ | |
int tc = g_atomic_int_get (&_idle_thread_cnt); | |
assert (tc == _n_workers); | |
for (int i = 0; i < tc; ++i) { | |
sem_post (&_trigger); | |
} | |
/* and the main thread */ | |
sem_post (&_callback_start); | |
/* join threads */ | |
for (auto i : _worker_threads) { | |
pthread_join (i, NULL); | |
} | |
pthread_join (_main_thread, NULL); | |
} | |
/* The actual entry-point to start processing all nodes, | |
* and wait for them to complete */ | |
void | |
Graph::process_graph () | |
{ | |
printf (" -- START PROCESS --\n"); | |
const int64_t start = g_get_monotonic_time (); | |
sem_post (&_callback_start); | |
sem_wait (&_callback_done); | |
const int64_t end = g_get_monotonic_time (); | |
printf (" -- END PROCESS - ELAPSED: %.1fms\n", (end - start) / 1e3f); | |
} | |
/* ****************************************************************************/ | |
static void | |
setup_graph (Graph& g) | |
{ | |
/* Setup an ordered process graph. | |
* | |
* https://en.wikipedia.org/wiki/Directed_acyclic_graph | |
* https://en.wikipedia.org/wiki/Topological_sorting | |
*/ | |
/* The GraphNode constructor also need to be changed to have | |
* a pointer to the node-processing method. | |
* In this example GraphNode::process() just sleeps to simulate work. | |
* | |
* An alternative approach is to subclass | |
* class YourProcessNode : public GraphNode; | |
* and overload GraphNode::process(); | |
*/ | |
#if 0 | |
/* Create some example graph | |
* | |
* [master] | |
* ^ ^ | |
* [bus-1] [bus-2] | |
* ^ ^ ^ | |
* [T1] [T2] [T3] | |
* | |
* https://gareus.org/misc/thesis-p8/2017-12-Gareus-Lat.pdf | |
* Fig. 31, Table 7; page 134, 135 | |
*/ | |
GraphNode* master = g.add_terminal_node ("master"); | |
GraphNode* bus2 = g.add_node ("bus2"); | |
GraphNode* bus1 = g.add_node ("bus1"); | |
GraphNode* track3 = g.add_initial_node ("track3"); | |
GraphNode* track2 = g.add_initial_node ("track2"); | |
GraphNode* track1 = g.add_initial_node ("track1"); | |
g.connect (bus1, master); | |
g.connect (bus2, master); | |
g.connect (track1, bus1); | |
g.connect (track2, bus2); | |
g.connect (track3, bus2); | |
#else | |
/* some more complex graph | |
* [out-1] [out-2] | |
* / | \ / \ | |
* [L1-1] [L1-2] [L1-3] [L1-4] [L1-5] | |
* / \ | \ | / \ / | |
* [L2-1] [L2-2] [L2-3 ] [L2-4] | |
* | | \ | / \ | |
* [L3-1] [L3-2] [L3-3] [L3-4] [L3-5] | |
* | / \ | / | |
* [L4-1] `[L4-2] | |
* | |
*/ | |
GraphNode* out1 = g.add_terminal_node ("out-1"); | |
GraphNode* out2 = g.add_terminal_node ("out-2"); | |
GraphNode* l1_1 = g.add_node ("L1-1"); | |
GraphNode* l1_2 = g.add_node ("L1-2"); | |
GraphNode* l1_3 = g.add_node ("L1-3"); | |
GraphNode* l1_4 = g.add_node ("L1-4"); | |
GraphNode* l1_5 = g.add_node ("L1-5"); | |
GraphNode* l2_1 = g.add_node ("L2-1"); | |
GraphNode* l2_2 = g.add_node ("L2-2"); | |
GraphNode* l2_3 = g.add_node ("L2-3"); | |
GraphNode* l2_4 = g.add_node ("L2-4"); | |
GraphNode* l3_1 = g.add_node ("L3-1"); | |
GraphNode* l3_2 = g.add_node ("L3-2"); | |
GraphNode* l3_3 = g.add_node ("L3-3"); | |
GraphNode* l3_4 = g.add_node ("L3-4"); | |
GraphNode* l3_5 = g.add_node ("L3-5"); | |
GraphNode* l4_1 = g.add_initial_node ("L4-1"); | |
GraphNode* l4_2 = g.add_initial_node ("L4-2"); | |
g.connect (l1_1, out1); | |
g.connect (l1_2, out1); | |
g.connect (l1_3, out1); | |
g.connect (l1_4, out2); | |
g.connect (l1_5, out2); | |
g.connect (l2_1, l1_1); | |
g.connect (l2_2, l1_2); | |
g.connect (l2_3, l1_2); | |
g.connect (l2_3, l1_3); | |
g.connect (l2_3, l1_4); | |
g.connect (l2_4, l1_4); | |
g.connect (l2_4, l1_5); | |
g.connect (l3_1, l2_1); | |
g.connect (l3_2, l2_2); | |
g.connect (l3_3, l2_2); | |
g.connect (l3_4, l2_4); | |
g.connect (l3_5, l2_4); | |
g.connect (l3_3, l2_3); | |
g.connect (l4_1, l3_1); | |
g.connect (l4_1, l3_2); | |
g.connect (l4_2, l3_3); | |
g.connect (l4_2, l3_4); | |
g.connect (l4_2, l3_5); | |
#endif | |
/* The graph has a dedicated setup chain. | |
* When the process-graph changes, a second graph is build in the background, | |
* while the current graph continues processing. | |
* | |
* rechain() activates the setup graph and drops the previous graph. | |
* It must not be called concurrently with processing. | |
* | |
* ie. it can only be called before or after ::process_graph(). | |
*/ | |
g.rechain (); | |
} | |
int | |
main (int argc, char** argv) | |
{ | |
srand (time (NULL)); | |
Graph g (N_WORKERS); | |
/* start process threads */ | |
g.start (); | |
/* Add some nodes to process */ | |
setup_graph (g); | |
/* Process a few times */ | |
for (int i = 0; i < 5; ++i) { | |
g.process_graph (); | |
} | |
/* Cleanup and quit */ | |
g.terminate (); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment