Created
March 3, 2020 03:34
-
-
Save djg/975cebaa3739e1ffb6924c7dce7bb8f9 to your computer and use it in GitHub Desktop.
Sean Parent - OKish Task System with Task Stealing in C++14
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Horrible task system - task stealing */ | |
#include <atomic> | |
#include <condition_variable> | |
#include <deque> | |
#include <functional> | |
#include <mutex> | |
#include <thread> | |
#include <vector> | |
using namespace std; | |
using lock_t = unique_lock<mutex>; | |
class notification_queue { | |
deque<function<void()>> _q; | |
mutex _mutex; | |
condition_variable _ready; | |
bool _done{false}; | |
public: | |
void done() { | |
{ | |
unique_lock<mutex> lock{_mutex}; | |
_done = true; | |
} | |
} | |
bool pop(function<void()>& x) { | |
lock_t lock{_mutex}; | |
while (_q.empty() && !_done) _ready.wait(lock); | |
if (_q.empty()) return false; | |
x = move(_q.front()); | |
_q.pop_front(); | |
return true; | |
} | |
template<typename F> | |
void push(F&& f) { | |
{ | |
lock_t lock{_mutex}; | |
_q.emplace_back(forward<F>(f)); | |
} | |
_ready.notify_one(); | |
} | |
bool try_pop(function<void()>& x) { | |
lock_t lock{_mutex, try_to_lock}; | |
if (!lock || _q.empty()) return false; | |
x = move(_q.front()); | |
_q.pop_front(); | |
return true; | |
} | |
template<typename F> | |
bool try_push(F&& f) { | |
{ | |
lock_t lock{_mutex, try_to_lock}; | |
if (!lock) return false; | |
_q.emplace_back(forward<F>(f)); | |
} | |
_ready.notify_one(); | |
return true; | |
} | |
}; | |
class task_system { | |
const unsigned K = 10; | |
const unsigned _count{thread::hardware_concurrency()}; | |
vector<thread> _threads; | |
vector<notification_queue> _q{_count}; | |
atomic<unsigned> _index{0}; | |
void run(unsigned i) { | |
while (true) { | |
function<void()> f; | |
for (unsigned n = 0; n != _count; ++n) { | |
if (_q[(i + n) % _count].try_pop(f)) break; | |
} | |
if (!f && !_q[i].pop(f)) break; | |
f(); | |
} | |
} | |
public: | |
task_system() { | |
for (unsigned n = 0; n != _count; ++n) { | |
_threads.emplace_back([&, n]{ run(n); }); | |
} | |
} | |
~task_system() { | |
for (auto& e : _q) e.done(); | |
for (auto& e : _threads) e.join(); | |
} | |
template <typename F> | |
void async_(F&& f) { | |
auto i = _index++; | |
for (unsigned n = 0; n != _count * K; ++n) { | |
if (_q[(i + n) % _count].try_push(forward<F>(f))) return; | |
} | |
_q[i % _count].push(forward<F>(f)); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment