Skip to content

Instantly share code, notes, and snippets.

@vshabanov
Last active November 23, 2025 15:12
Show Gist options
  • Select an option

  • Save vshabanov/d3c1d50e08dd95a682153f3d2fb0940d to your computer and use it in GitHub Desktop.

Select an option

Save vshabanov/d3c1d50e08dd95a682153f3d2fb0940d to your computer and use it in GitHub Desktop.
Flat purely functional DAG with updateable inputs, lock-free lookups, and a concurrent node construction with a deadlock detector
// g++ -g -fopenmp -O2 -std=gnu++2c -Wall -Wpedantic -Iboost_1_89_0 Graph.cpp && ./a.out
/* [WIP]
TODO
- removing unused nodes?
- ref data use case -- evaluate immediately and only keep results
- exceptions in recipes -- do not keep unused nodes
- recipes that add nodes but do not use them
--> create temporary nodes in construct() and only extend IndexedGraph
once construct() succeeds
but still can evaluate them
- test parallel evals (& constructs)
- eval() in copies (which no longer extend 'values'):
- could run in parallel with construct().
--> test
- could be lock-free
--> test
- can have per node function mutexes to remove repeated evaluations
--> todo
- we can keep the graph version and NamedGraphWithNewInputs will not find
nodes added to the base graph (while it will still share the hash table)
- serialization can save the current number of nodes and ignore
nodes added after it -- lock-free too
--- advanced
- publishing values inplace? just make IndexedGraphNewInputs?
- could be implemented with a list of parents
publishing replaces inputs and clears parents
we have shared lock for evals (to block resize)
could use if for atomic publish
- tracking of parents (so we can make a full copy of data
and only clear values that we need to re-evaluate)
- we can keep list of parents, find a list of changed nodes, and,
if it's less than a half (or a quarter), use hash map
- we can also pre-copy unchanged nodes to speedup evals (though
extra logic can slowdown the total copy+eval time)
- list of parents better done as arrays so we are slower at
construction and serialisation, but faster at bumping
- we can optimise the common case of a few nodes: the flat struct
with a size and a first few elements, and a pointer to the rest
(not a union for the lock-free access)
x constructLock can be less coarse -- we can't parallelize small
constructs due to the lock.
thread can be allocated in graphenv (with checks that graph env doesn't
go from thread to thread). Or it can be in lock-free lookup table
(we don't have too many threads, can leak them).
newThread/freeThread/newConstruct can all work
outside of any lock.
--> finer grained locks slowdown the sequential case
1M+ constructs a second looks ok
x create thin NamedGraph that only has current recipe stack +
IndexedGraph* since we do a deadlock detection we won't have
circular references.
--> we just keep thread id->thread map
x we can start building other recipes while we wait
--> no, too hard to do, we shall always run recipes from
top-level depends without spawning extra threads inside
we can limit the number of active threads with a semaphore
curve
usd -> curve
eur -> usd
curve -- runs
usd waits on curve, runs eur, waits on usd -- that's a loop
unless we use a different thread ID for the extra run
then once curve ends it can run usd (but how, if we run eur?)
with green threads we could spawn a thread whilst we wait
with native threads we can run more threads but have a semaphore
on wait.
ideally, we need continuations like in nested data parallelism
instead of evaluating depend directly we shall put this depend to
a queue with a continuation to call once it's ready
but this will require changes in user code
we can provide a simplified parMap that uses current thread
+ free top-level threads ??
*/
/* Flat DAG with changeable inputs.
Creating a copy with changed inputs is basically one calloc(O(n))
(much faster than node-based).
Serialization should be faster too.
Computing values is also up to O(n), not O(n_changed). Same as
node-based w/o marking changed parents, but with better cache
utilization.
*/
#include "LockFreeLookupHashTable.hpp"
#include <algorithm>
#include <boost/functional/hash.hpp>
#include <boost/unordered/unordered_flat_map.hpp>
#include <chrono>
#include <forward_list>
#include <future>
#include <latch>
#include <memory_resource>
#include <mutex>
#include <new>
#include <optional>
#include <random>
#include <ranges>
#include <set>
#include <shared_mutex>
#include <thread>
#include <thread>
#include <time.h>
#include <type_traits>
#include <vector>
//////////////////////////////////////////////////////////////////////////////
// Flat DAG
//
// Values and opcodes
//
template < typename T >
using string_map_t = LockFreeLookupHashTable<std::string, T, boost::hash<std::string>>;
// simple dynamically typed values for demo and for checking memory issues
enum Type { String, Double };
struct Value {
union Data {
const char *asString;
double asDouble;
};
std::atomic<size_t> refCount;
const Type type;
const Data data;
Value(size_t rc, Type t, Data d) : refCount(rc), type(t), data(d) {}
Value(Type t, Data d) : refCount(0), type(t), data(d) {}
~Value() { if (type == String) free((void*)data.asString); }
void AddRef() {
// size_t prev =
refCount.fetch_add(1, std::memory_order_relaxed);
// printSelf(); printf(" AddRef %zu\n", prev+1);
}
void Release() {
size_t prev = refCount.fetch_sub(1, std::memory_order_release);
// printSelf(); printf(" Release %zu\n", prev-1);
if (prev == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
delete this;
}
}
void printSelf() {
if (type == Double) printf("%f", data.asDouble);
else if (type == String) printf("\"%s\"", data.asString);
else printf("unknown type %d", type);
}
double asDouble() const {
if (type == Double)
return data.asDouble;
else
throw std::runtime_error("not a double?");
}
const char *asString() const {
if (type == Double)
return data.asString;
else
throw std::runtime_error("not a string?");
}
};
// Value dummyDouble(0x100000000, Double, { .asDouble = 1.0 });
// Value dummyString(0x100000000, Double, { .asString = "dummy" });
Value *doubleVal(double d) {
// return &dummyDouble;
return new Value(Double, { .asDouble = d });
}
Value *stringVal(const char* s) {
// return &dummyString;
return new Value(String, { .asString = strdup(s) });
}
// all opcode arguments have the same size to be packed in one array
typedef uint32_t OpcodeArg;
typedef OpcodeArg NodeIndex;
typedef OpcodeArg OpcodeArgIndex; // IndexedGraph::opcodeArgs[i]
struct FunctionArgs {
OpcodeArg argsCount;
OpcodeArg functionIndex;
OpcodeArg nodeIndex[1];
};
// Opcode to compute graph node value
struct Opcode {
enum Code {
Const,
Input,
// Select,
Function
};
// upper byte is code, lower 3 bytes is index in IndexedGraph::opcodeArgs
uint32_t data;
Opcode() : data(0) {}
Opcode(Code c, OpcodeArgIndex i = 0) : data((c << 24) + (i & 0xFFFFFF)) {
if (i > 0xFFFFFF)
throw std::runtime_error("Opcode index is too big");
}
Code code() const { return static_cast<Code>(data >> 24); }
OpcodeArgIndex argsIndex() const { return data & 0xFFFFFF; }
};
typedef std::function<Value*(const Value**)> NodeFunction;
//
// Utils
//
// a quick hack for missing std::scope_exit
template <typename F>
struct scope_exit {
explicit scope_exit(const F&& f) : onExit(f) {}
~scope_exit() { onExit(); }
private:
F onExit;
};
struct null_mutex {
void lock() noexcept {}
void lock_shared() noexcept {}
bool try_lock() noexcept { return true; }
void unlock() noexcept {}
void unlock_shared() noexcept {}
};
//
// Append-only vector for flat DAG
//
// special append only vector for graphs.
// can be read lock-free,
// updates must be protected by a shared mutex to block resizes
// alloc will block if needs a resize
template <typename T>
class AppendOnlyVectorBase {
public:
AppendOnlyVectorBase()
: vecPtr(new std::vector<T>(16, T{}))
, vecSize{0}
{}
~AppendOnlyVectorBase() {
delete &vec();
for (auto v : oldVecs) delete v;
}
size_t size() const { return vecSize.load(std::memory_order_acquire); }
// alloc at the end
size_t alloc(size_t n, auto& lock) {
size_t i = vecSize.fetch_add(n, std::memory_order_relaxed);
ensure(i, n, lock);
return i;
}
void ensure(size_t i, size_t n, auto& lock) {
Vec& v = vec();
if (v.size() < i + n)
grow(std::bit_ceil(i + n), lock);
}
protected:
T& at(size_t i, const char *caller) {
Vec& v = vec();
checkValid(v, i, caller);
return v[i];
}
const T& at(size_t i, const char *caller) const {
Vec& v = vec();
checkValid(v, i, caller);
return v[i];
}
private:
using Vec = std::vector<T>;
Vec& vec() const { return *vecPtr.load(std::memory_order_acquire); }
static inline void checkValid(const Vec& v, size_t i, const char *caller) {
// if (i >= v.size()) [[unlikely]]
// throw std::runtime_error(
// std::string(caller) + " index out of bounds "
// + std::to_string(i) + " >= " + std::to_string(v.size()));
}
void grow(size_t newSize, auto& lock) {
std::scoped_lock _(lock);
Vec& v = vec();
if (v.size() >= newSize)
return; // already reallocated
Vec *newV = new std::vector<T>(newSize, T{});
oldVecs.push_back(&v);
std::copy(v.begin(), v.end(), newV->begin());
vecPtr.store(newV, std::memory_order_release);
}
std::atomic<Vec*> vecPtr; // current pointer, swapped on resize
std::vector<Vec*> oldVecs; // to free memory
std::atomic<size_t> vecSize;
};
template <typename T>
class AppendOnlyVector : public AppendOnlyVectorBase<T> {
public:
const T& operator[](size_t i) const { return this->at(i, "[]"); }
// unsafe* calls must be protected by a shared mutex
T& unsafeIndex(size_t i) { return this->at(i, "unsafeIndex"); }
void unsafeSet(size_t i, T x) { this->at(i, "unsafeSet") = x; }
};
template <typename T>
class AtomicAppendOnlyVector : public AppendOnlyVectorBase<T> {
public:
void unsafeStore(size_t i, T x, std::memory_order o) {
std::atomic_ref(this->at(i, "unsafeStore")).store(x, o);
}
T load(size_t i, std::memory_order o) const {
return std::atomic_ref(this->at(i, "load")).load(o);
}
};
//
// Base DAG
//
// Graph nodes + opcodes and base values
// Can be extended while evaluated (only locks on resizes).
// Evals use shared_mutex to block resizes.
// Updated graphs ('IndexedGraphNewInputs' below) only create a new 'values' array
//
// It's index-based append-only graph, names are added on another level.
struct IndexedGraph {
// opcodes to compute node values, arguments are flattened into arrays
AppendOnlyVector<Opcode> opcodes;
AppendOnlyVector<OpcodeArg> opcodeArgs;
AppendOnlyVector<NodeFunction> functions;
// Vec<Value::Type> types;
// node values
mutable AtomicAppendOnlyVector<Value*> values;
static constexpr std::size_t CACHE_LINE = 64; //std::hardware_destructive_interference_size;
alignas (CACHE_LINE) mutable std::shared_mutex resizeLock;
// alignas (CACHE_LINE) mutable null_mutex resizeLock;
// for better error messages only, IndexedGraph is index-based
std::function<const std::string *(NodeIndex)> nodeIndexToName;
~IndexedGraph() {
for (size_t i = 0; i < values.size(); i++) {
auto v = values.load(i, std::memory_order_acquire);
if (v) v->Release();
}
}
[[noreturn]] void ERR(const std::string& why, NodeIndex i) const {
const std::string *e = nodeIndexToName ? nodeIndexToName(i) : nullptr;
throw std::runtime_error(e ? why + " " + *e : why);
}
NodeIndex allocNode() {
NodeIndex r = values.alloc(1, resizeLock);
opcodes.ensure(r, 1, resizeLock);
// can't use opcodes.alloc due to a race:
// thread A: values.alloc -> 0
// thread B: values.alloc -> 1
// thread B: opcodes.alloc -> 0
// thread B: segfault accessing opcodes[1]
// thread A: opcodes.alloc -> 1 -- too late
return r;
}
// node creation only locks on resizes, i.e. ~4*10 times for 1024 nodes graph
NodeIndex constant(Value *c) {
c->AddRef();
NodeIndex i = allocNode();
std::shared_lock _(resizeLock);
opcodes.unsafeSet(i, Opcode(Opcode::Const));
values.unsafeStore(i, c, std::memory_order_release);
return i;
}
NodeIndex input() {
NodeIndex i = allocNode();
std::shared_lock _(resizeLock);
opcodes.unsafeSet(i, Opcode(Opcode::Input));
values.unsafeStore(i, nullptr, std::memory_order_release);
return i;
}
NodeIndex input(Value *d) {
d->AddRef();
NodeIndex i = allocNode();
std::shared_lock _(resizeLock);
opcodes.unsafeSet(i, Opcode(Opcode::Input));
values.unsafeStore(i, d, std::memory_order_release);
return i;
}
template <typename... Args>
NodeIndex function(const NodeFunction& f, Args... args) {
NodeIndex i = allocNode();
OpcodeArg argsIndex =
opcodeArgs.alloc(2 + sizeof...(Args), resizeLock);
OpcodeArg functionIndex = functions.alloc(1, resizeLock);
std::shared_lock _(resizeLock);
functions.unsafeSet(functionIndex, f);
FunctionArgs& fa = *(FunctionArgs*)&opcodeArgs.unsafeIndex(argsIndex);
fa.argsCount = sizeof...(Args);
fa.functionIndex = functionIndex;
size_t ni = 0;
((fa.nodeIndex[ni++] = args), ...);
opcodes.unsafeSet(i, Opcode(Opcode::Function, argsIndex));
values.unsafeStore(i, nullptr, std::memory_order_release);
return i;
}
Value *eval(NodeIndex i) const {
auto r = values.load(i, std::memory_order_acquire);
if (r)
return r;
auto opcode = opcodes[i].code();
switch (opcode) {
case Opcode::Input:
ERR("Undefined input", i);
case Opcode::Const:
ERR("Undefined constant?", i);
case Opcode::Function: {
const FunctionArgs& fa =
*(const FunctionArgs*)&opcodeArgs[opcodes[i].argsIndex()];
const Value **args = (const Value**)alloca(sizeof args[0] * fa.argsCount);
for (OpcodeArg i = 0; i < fa.argsCount; i++) {
args[i] = eval(fa.nodeIndex[i]);
}
Value *r = functions[fa.functionIndex](args);
r->AddRef();
std::shared_lock _(resizeLock);
// shared lock guarantees no resizes and vector copies
// are taking place, we can replace value atomically.
values.unsafeStore(i, r, std::memory_order_release);
return r;
}
default:
throw std::runtime_error("unsupported opcode " + std::to_string(opcode));
}
}
};
//
// Graph with new inputs
//
// lock-free eval even if the base graph is being extended
struct IndexedGraphNewInputs {
struct NodeValue {
std::atomic<Value*> value;
// only version == graph version needs to be destroyed
// version < graph version are shared values
int version;
NodeValue() : value(nullptr), version(0) {}
NodeValue& operator=(const NodeValue& n) {
auto v = n.value.load(std::memory_order_acquire);
version = n.version;
value.store(v, std::memory_order_release);
return *this;
}
};
size_t size;
NodeValue *values;
IndexedGraphNewInputs *parent;
const IndexedGraph *base;
int version;
typedef std::vector<std::pair<NodeIndex, Value*>> Inputs;
~IndexedGraphNewInputs() {
for (size_t i = 0; i < size; i++) {
NodeValue& nv = values[i];
if (nv.version == version) {
Value *v = nv.value.load(std::memory_order_acquire);
if (v) v->Release();
}
}
delete[] values;
}
IndexedGraphNewInputs(const IndexedGraph *b, const Inputs& inputs) {
base = b;
version = 1;
parent = nullptr;
initValues(inputs);
}
IndexedGraphNewInputs(IndexedGraphNewInputs *p, const Inputs& inputs) {
parent = p;
base = parent->base;
version = parent->version + 1;
initValues(inputs);
}
void initValues(const Inputs& inputs) {
size = base->values.size();
values = new NodeValue[size];
for (const auto& [index, val] : inputs) {
auto& v = values[index];
v.version = version;
if (val)
val->AddRef();
v.value.store(val, std::memory_order_relaxed);
}
std::atomic_thread_fence(std::memory_order_release);
}
Value *eval(NodeIndex i) {
return evalNodeValue(i).value.load(std::memory_order_relaxed);
}
NodeValue& evalNodeValue(NodeIndex i) {
NodeValue& vi = values[i];
if (vi.value.load(std::memory_order_acquire))
return vi;
auto opcode = base->opcodes[i].code();
switch (opcode) {
case Opcode::Input:
if (parent) {
vi = parent->evalNodeValue(i);
return vi;
}
// fallthrough
case Opcode::Const:
vi.version = 0;
vi.value.store(base->eval(i), std::memory_order_release);
return vi;
case Opcode::Function: {
// only function evaluation needs a lock
// and only not to evaluate the same thing several times
const FunctionArgs& fa =
*(const FunctionArgs*)&base->opcodeArgs[base->opcodes[i].argsIndex()];
const Value **args = (const Value **)
alloca(sizeof args[0] * fa.argsCount);
int maxVersion = 0;
for (OpcodeArg i = 0; i < fa.argsCount; i++) {
NodeValue& v = evalNodeValue(fa.nodeIndex[i]);
args[i] = v.value.load(std::memory_order_acquire);
maxVersion = std::max(maxVersion, v.version);
}
if (maxVersion < version) {
vi = parent->evalNodeValue(i);
return vi;
}
Value *r = base->functions[fa.functionIndex](args);
r->AddRef();
vi.version = version;
vi.value.store(r, std::memory_order_release);
return vi;
}
default:
throw std::runtime_error("unsupported opcode " + std::to_string(opcode));
}
}
};
//////////////////////////////////////////////////////////////////////////////
// Deadlock detector
// Deadlock loop info for the error reporting
struct DeadlockInfo {
typedef std::vector<std::string> Loop;
struct IndexedLoop {
Loop loop;
boost::unordered_flat_map<std::string, size_t> nameToIndex;
IndexedLoop(Loop&& _loop) : loop(_loop) {
size_t i = 0;
for (auto& n : loop)
nameToIndex.emplace(n, i++);
}
};
const std::shared_ptr<const IndexedLoop> indexedLoop;
size_t startIndex; // start index in the loop
Loop extra; // extra recipes on top of the loop
DeadlockInfo(Loop&& _loop)
: indexedLoop(new IndexedLoop(std::move(_loop)))
, startIndex(0)
{}
DeadlockInfo(const std::string& x, const DeadlockInfo& xs)
: indexedLoop(xs.indexedLoop)
, startIndex(xs.startIndex)
, extra(xs.extra)
{
if (extra.size() == 0) {
auto it = indexedLoop->nameToIndex.find(x);
if (it != indexedLoop->nameToIndex.end()) {
// we're seeing the same loop exception from the other
// node in the loop, change the loop start index
startIndex = it->second;
return;
}
}
// we're out of the loop, append the node to the extra path
extra.push_back(x);
}
std::string formatError() const {
std::ostringstream oss;
oss << "A loop in a recipe:\n";
auto out = [&](const std::string& n) { oss << " " << n << "\n"; };
for (auto e : extra | std::views::reverse)
out(e);
auto& l = indexedLoop->loop;
out(l[startIndex] + " <--");
for (size_t i = startIndex + 1; i < l.size(); i++)
out(l[i]);
for (size_t i = 0; i < startIndex; i++)
out(l[i]);
out(l[startIndex] + " -->");
return oss.str();
}
};
struct DeadlockException : std::runtime_error {
DeadlockInfo deadlockInfo;
DeadlockException(const DeadlockInfo& d)
: std::runtime_error(d.formatError())
, deadlockInfo(d)
{}
DeadlockException(const std::string& x, const DeadlockInfo& xs)
: DeadlockException(DeadlockInfo(x, xs))
{}
DeadlockException(const std::string& x, const DeadlockException& xs)
: DeadlockException(DeadlockInfo(x, xs.deadlockInfo))
{}
};
/*
a -> b -> c -> a loop
d -> c
construct [a,b,c,d]
a -> b (thread #1) ok
b -> c (thread #2) ok
d -> c (thread #3) ok
c -> a (thread #4) -- deadlock
fails recipe 'c', it will fail others who wait
*/
// Builds a wait-for graph https://en.wikipedia.org/wiki/Wait-for_graph
// from Threads to Constructs and detects loops.
//
// O(n_chainedWaitingThreads) per waitConstruct. I.e. many threads
// waiting for one recipe are still O(1) per waitConstruct.
// Only >O(1) if there's a chain of recipes to follow.
//
// It's a pure algorithm (can be tested single threaded).
//
// newThread/freeThread/newConstruct can all work
// lock-free provided they're called from the same thread
//
// waitConstruct needs a lock (walks through different threads)
// constructFinished needs a lock (shared between threads)
struct DeadlockDetector {
// flag whether the node was visited, keeps the current scan version
typedef uint64_t ScanId;
ScanId curScanId;
struct Construct;
typedef std::forward_list<Construct*> ConstructStack;
struct Thread {
const std::string name;
bool waiting; // waiting for the top of the constructStack to be ready
ConstructStack constructStack;
};
struct Construct {
const std::string name;
Thread *constructThread; // the thread that's building the recipe
ScanId visitedScanId; // used to mark visited nodes in checkLoops
size_t refCount; // 1 newConstruct + N waitConstruct
// non-atomic as waitConstruct
};
DeadlockDetector() : curScanId(0) {}
Thread *newThread(const std::string& name) {
return new Thread{
.name = name,
.waiting = false,
.constructStack = {}
};
}
void freeThread(Thread *t) {
if (!t->constructStack.empty())
ERR("freeThread: non-empty construct stack "
+ toString(t->constructStack));
delete t;
}
// Call when you start constructing a new named node
Construct *newConstruct(Thread *thread, const std::string& name) {
auto construct = new Construct{
.name = name,
.constructThread = thread,
.visitedScanId = 0,
.refCount = 1
};
thread->constructStack.push_front(construct);
return construct;
}
// Call when the node you want to construct is already being constructed.
// Does not perform an actual waiting, only changes the detector state.
// Must be protected by a lock.
void waitConstruct(Thread *thread, Construct *c) {
if (thread->waiting)
ERR("waitConstruct: thread is already in the waiting state?");
++curScanId;
DeadlockInfo::Loop loop;
if (deadlock(thread, c, loop)) {
loop.push_back(c->name);
std::reverse(loop.begin(), loop.end());
// note, it's the deadlock loop only.
// need extra logic to add constructs that lead to the
// deadlock
throw DeadlockException(std::move(loop));
}
thread->waiting = true;
thread->constructStack.push_front(c);
c->refCount++;
}
// the meat
[[nodiscard("check the deadlock status")]]
bool deadlock(const Thread *t, Construct *c, DeadlockInfo::Loop& loop) const {
if (c->visitedScanId == curScanId || !c->constructThread)
return false; // already visited or already constructed
c->visitedScanId = curScanId;
// printf("check %s %s (%s)\n", t->name.data(), c->name.data(),
// c->constructThread->name.data());
if (t == c->constructThread) {
// loop found -- we're already constructing this recipe ourselves
addStackSlice(c, loop);
return true; // deadlock!
} else if (c->constructThread->waiting) {
// if the recipe thread is waiting, follow the recipe it waits for
auto waitsFor = c->constructThread->constructStack.front();
if (deadlock(t, waitsFor, loop)) {
addStackSlice(c, loop);
return true;
}
}
return false;
}
void addStackSlice(Construct *c, DeadlockInfo::Loop& loop) const {
// add [top..c) slice from of the recipe stack
// for the error reporting
for (auto x : c->constructThread->constructStack) {
if (x == c) break;
loop.push_back(x->name);
}
}
// Call once your construct has finished (even if it throws an exception)
bool constructFinished(Thread *t, Construct *c) {
if (t->constructStack.empty())
ERR("constructFinished: empty constructStack");
if (t->constructStack.front() != c)
ERR("constructFinished: construct mismatch " + c->name
+ " in not the last in " + toString(t->constructStack));
t->constructStack.pop_front();
t->waiting = false;
if (t == c->constructThread)
// clear, so we won't follow the thread that has potentially
// finished its job and deleted
c->constructThread = nullptr;
bool last = --c->refCount == 0;
if (last)
delete c;
return last;
}
// utils
[[noreturn]] static void ERR(const std::string& why) {
// printf("ERR: %s\n", why.data());
throw std::runtime_error(why);
}
static std::string toString(const ConstructStack& l) {
std::vector<const std::string*> names;
for (auto c : l)
names.push_back(&c->name);
std::ostringstream oss;
for (auto n : names | std::views::reverse) {
oss << " " << *n << '\n';
}
return oss.str();
}
};
//
// DeadlockDetector + named constructs
//
template <typename T>
struct NamedDeadlockDetector
{
typedef std::mutex mutex_t;
// all constructs data modifications must be protected by a clock
mutex_t constructLock;
string_map_t<T> nameToT;
LockFreeLookupHashTable<T, std::string> tToName;
DeadlockDetector deadlockDetector;
enum Result { OK, Deadlock, Exception };
struct ConstructThunk {
DeadlockDetector::Construct *construct;
std::latch ready;
std::variant<T, DeadlockInfo, std::exception_ptr> result;
ConstructThunk(DeadlockDetector::Construct *_construct)
: construct(_construct)
, ready{1}
{}
};
template <typename ...Args>
using map_t = boost::unordered_flat_map<Args...>;
map_t<std::string, ConstructThunk*> constructThunks;
map_t<std::thread::id, DeadlockDetector::Thread*, std::hash<std::thread::id>> threads;
T construct(const std::string& name, const auto& build) {
auto r = nameToT.lookupPtr(name);
if (r) return *r; // already constructed
auto threadId = std::this_thread::get_id();
bool newConstruct = false;
DeadlockDetector::Thread *t = nullptr;
ConstructThunk *ct = nullptr;
std::unique_lock lock(constructLock, std::defer_lock);
auto _finally = scope_exit([&]{
if (!lock) lock.lock();
if (ct && deadlockDetector.constructFinished(t, ct->construct)) {
constructThunks.erase(name);
delete ct;
}
if (t && t->constructStack.empty()) { // last construct
threads.erase(threadId);
deadlockDetector.freeThread(t);
}
});
{
lock.lock();
// recheck after taking a lock
r = nameToT.lookupPtr(name);
if (r) return *r;
auto& tRef = threads[threadId];
if (!tRef)
tRef = deadlockDetector.newThread("thread for " + name);
t = tRef;
auto& ctRef = constructThunks[name];
if (!ctRef) {
newConstruct = true;
ctRef = new ConstructThunk(
deadlockDetector.newConstruct(t, name));
} else {
// run the deadlock detector
deadlockDetector.waitConstruct(t, ctRef->construct);
// ^ it throws
}
ct = ctRef;
lock.unlock();
}
if (newConstruct) {
try {
ct->result.template emplace<OK>(build(name));
} catch (const DeadlockException& e) {
ct->result.template emplace<Deadlock>(e.deadlockInfo);
} catch (...) {
ct->result.template emplace<Exception>(std::current_exception());
}
ct->ready.count_down();
} else {
ct->ready.wait();
}
switch (ct->result.index()) {
case OK: {
lock.lock();
auto r = std::get<OK>(ct->result);
nameToT.unsafeEmplace(name, [&](){ return r; });
tToName.unsafeEmplace(r, [&](){ return name; });
return r;
}
case Deadlock:
throw DeadlockException(name, std::get<Deadlock>(ct->result));
case Exception:
std::rethrow_exception(std::get<Exception>(ct->result));
default:
throw std::runtime_error("Unexpected varian index " + std::to_string(ct->result.index()));
}
}
};
//////////////////////////////////////////////////////////////////////////////
// Recipes
struct NamedGraph;
typedef std::function<NodeIndex(NamedGraph&, const std::string&)> GraphRecipe;
typedef string_map_t<GraphRecipe> Cookbook;
const GraphRecipe *findRecipe(const Cookbook& cb, const std::string& name) {
auto r = cb.lookupPtr(name);
if (r)
return r;
size_t pos = name.size();
while (true) {
pos = name.rfind('.', pos);
if (pos == std::string::npos)
return nullptr;
r = cb.lookupPtr(name.substr(0, pos));
if (r)
return r;
}
}
// IndexedGraph extended with ability to build named nodes.
// Adds node name <-> index map, cookbook and DeadlockDetector
struct NamedGraph
{
const Cookbook& cookbook;
IndexedGraph base;
NamedDeadlockDetector<NodeIndex> deadlockDetector;
NamedGraph(const Cookbook& c) : cookbook(c) {
base.nodeIndexToName = [&](NodeIndex n) { return getName(n); };
}
Value *get(const std::string& name) const {
return base.eval(getIndex(name));
}
NodeIndex getIndex(const std::string& name) const {
auto i = deadlockDetector.nameToT.lookupPtr(name);
if (!i)
throw std::runtime_error("Node " + name + " not found");
return *i;
}
const std::string *getName(NodeIndex i) const {
return deadlockDetector.tToName.lookupPtr(i);
}
NodeIndex construct(const std::string& name) {
return deadlockDetector.construct(name, [&](const std::string& name) {
const GraphRecipe *r = findRecipe(cookbook, name);
if (!r)
throw std::runtime_error("No recipe for " + name);
return (*r)(*this, name);
});
}
};
// version of IndexedGraphNewInputs with named nodes
struct NamedGraphWithNewInputs
{
typedef std::vector<std::pair<std::string, Value*>> Inputs;
const NamedGraph *env;
IndexedGraphNewInputs graph;
NamedGraphWithNewInputs(const NamedGraph *e, const Inputs& inputs)
: env(e)
, graph(&e->base, toIndexedInputs(e, inputs))
{}
NamedGraphWithNewInputs(NamedGraphWithNewInputs *p, const Inputs& inputs)
: env(p->env)
, graph(&p->graph, toIndexedInputs(p->env, inputs))
{}
static IndexedGraphNewInputs::Inputs toIndexedInputs(
const NamedGraph *e, const Inputs& named)
{
IndexedGraphNewInputs::Inputs indexed;
indexed.resize(named.size());
for (size_t i = 0; i < named.size(); i++) {
indexed[i].first = e->getIndex(named[i].first);
indexed[i].second = named[i].second;
}
return indexed;
}
Value *get(const std::string& name) {
return graph.eval(env->getIndex(name));
}
int debug_NodeVersion(const std::string& name) {
return graph.evalNodeValue(env->getIndex(name)).version;
}
};
//
// Tests
//
Value *plus(const Value **args) {
return doubleVal(args[0]->asDouble() + args[1]->asDouble());
}
static inline void cpu_relax() {
#if defined(__x86_64__) || defined(__i386__)
asm volatile("pause" ::: "memory");
#elif defined(__aarch64__) || defined(__arm__)
asm volatile("yield" ::: "memory");
#endif
}
static inline uint64_t monotonic_ns() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
}
static inline void spin_for_ns(uint64_t duration_ns) {
uint64_t end = monotonic_ns() + duration_ns;
while (monotonic_ns() < end) {
cpu_relax();
}
}
typedef boost::unordered_flat_map<std::string, std::string> pcMap;
struct Mark { std::string child; int mark; };
typedef boost::unordered_flat_map<std::string, Mark> pcMarkMap;
bool hasLoop(pcMarkMap& map, int mark, const std::string& cur) {
auto it = map.find(cur);
enum { BAD = -100, GOOD = -200 };
if (it == map.end())
return false;
if (it->second.mark == mark) {
it->second.mark = BAD;
return true;
}
if (it->second.mark == BAD)
return true;
if (it->second.mark == GOOD)
return false;
it->second.mark = mark;
bool loop = hasLoop(map, mark, it->second.child);
it->second.mark = loop ? BAD : GOOD;
return loop;
}
void testDeadlockDetector() {
if (false) {
DeadlockDetector d;
auto t1 = d.newThread("t1");
auto t2 = d.newThread("t2");
auto t3 = d.newThread("t3");
auto a = d.newConstruct(t1, "a");
auto b = d.newConstruct(t2, "b");
auto c = d.newConstruct(t3, "c");
auto aa = d.newConstruct(t1, "aa");
auto bb = d.newConstruct(t2, "bb");
auto cc = d.newConstruct(t3, "cc");
auto bbb = d.newConstruct(t2, "bbb");
d.waitConstruct(t1, bb);
d.waitConstruct(t2, c);
try {
d.waitConstruct(t3, a);
} catch (const DeadlockException& e) {
throw DeadlockException(c->name, DeadlockException(cc->name, e));
}
d.freeThread(t1);
d.freeThread(t2);
d.freeThread(t3);
// construct [a,b,c]
// a -> b (thread #1 parent a) ok
// b -> c (thread #2 parent b) ok
// c -> a (thread #3 parent c) -- deadlock
}
int n = 100000;
pcMap parentChild;
std::vector<std::string> nodes;
Cookbook cookbook;
parentChild.reserve(10*n);
nodes.reserve(10*n);
cookbook.unsafeReserve(10*n);
auto input = [](NamedGraph& g, const std::string&){ return g.base.input(); };
auto node = [&](NamedGraph& g, const std::string& n){
// spin_for_ns(10000);
// std::this_thread::sleep_for(std::chrono::nanoseconds(100));
auto c1 = g.base.constant(doubleVal(1.0));
auto i2 = g.base.input(doubleVal(2.0));
auto child = g.construct(parentChild[n]);
return g.base.function(plus, c1, g.base.function(plus, child, i2));
};
std::mt19937 mt;
std::uniform_int_distribution<int> rand(0, n-1);
// simple 4 layers of hierarchy
// for (int i = 0; i < n; i++) {
// char from[100], mid1[100], mid2[100], to[100];
// snprintf(from, sizeof from, "a%06d", i);
// snprintf(mid1, sizeof mid1, "b%06d", rand(mt));
// snprintf(mid2, sizeof mid1, "c%06d", rand(mt));
// snprintf(to, sizeof to, "i%03d", rand(mt));
// parentChild[from] = mid1;
// parentChild[mid1] = mid2;
// parentChild[mid2] = to;
// cookbook.unsafeEmplace(from, [&](){ return node; });
// cookbook.unsafeEmplace(mid1, [&](){ return node; });
// cookbook.unsafeEmplace(mid2, [&](){ return node; });
// cookbook.unsafeEmplace(to, [&](){ return input; });
// nodes.push_back(from);
// nodes.push_back(from);
// nodes.push_back(mid1);
// nodes.push_back(mid1);
// nodes.push_back(mid2);
// nodes.push_back(mid2);
// nodes.push_back(to);
// nodes.push_back(to);
// }
int nInputs = (int)(n/98);
// why there's a jump 0->17k errors between n/98 and n/99 ?
for (int i = 0; i < n; i++) {
char from[100], to[100];
snprintf(from, sizeof from, "a%06d", i);
nodes.push_back(from);
if (i < nInputs) {
cookbook.unsafeEmplace(from, [&](){ return input; });
} else {
cookbook.unsafeEmplace(from, [&](){ return node; });
int out = rand(mt);
snprintf(to, sizeof to, "a%06d", out);
parentChild[from] = to;
}
}
std::shuffle(nodes.begin(), nodes.end(), mt);
pcMarkMap markMap;
std::set<std::string> refErrors;
for (auto & [p,c] : parentChild)
markMap.emplace(p, Mark{c, -1});
for (size_t i = 0; i < nodes.size(); i++)
if (hasLoop(markMap, i, nodes[i])) {
refErrors.emplace(nodes[i]);
}
for (size_t reps = 0; reps < 10; reps++) {
NamedGraph g(cookbook);
std::mutex errorsLock;
std::map<std::string, std::string> errors;
#pragma omp parallel for num_threads(24) schedule(dynamic, 1000)
for (size_t i = 0; i < nodes.size(); i++) {
try {
g.construct(nodes[i]);
} catch (const DeadlockException& e) {
// puts(e.what());
std::scoped_lock _(errorsLock);
errors.emplace(nodes[i], e.what());
}
}
printf("errors %zd, ref errors %zd, size %zd, nodes %zd\n", errors.size(), refErrors.size(), g.deadlockDetector.nameToT.size(), g.base.values.size());
std::set<std::string> diff;
auto errorNodes = std::views::keys(errors);
std::set_difference(errorNodes.begin(), errorNodes.end(),
refErrors.begin(), refErrors.end(),
std::inserter(diff, diff.end()));
for (auto & e : diff) {
printf("%s: %s\n", e.data(), errors[e].data());
}
}
}
int main()
{
Cookbook cb = {
{"a", [](NamedGraph& g, const std::string&){ return g.construct("b"); } },
{"b", [](NamedGraph& g, const std::string&){ return g.construct("c"); } },
{"c", [](NamedGraph& g, const std::string&){ return g.construct("a"); } },
{"constant", [](NamedGraph& g, const std::string&){
return g.base.constant(doubleVal(1)); } },
{"input", [](NamedGraph& g, const std::string&){
return g.base.input(); } },
{"a+b", [](NamedGraph& g, const std::string&){
return g.base.function(plus,
g.construct("input.a"),
g.construct("input.b")); } }
};
NamedGraph g(cb);
printf("index %d\n", g.construct("input.a"));
printf("index %d\n", g.construct("input.b"));
printf("index %d\n", g.construct("a+b"));
try {
printf("index %d\n", g.construct("a"));
} catch(const std::exception& e) {
printf("error: %s\n", e.what());
}
try {
printf("r = %f\n", g.get("a+b")->asDouble());
} catch(const std::exception& e) {
printf("error: %s\n", e.what());
}
auto g2 = NamedGraphWithNewInputs(&g, {
{"input.a", doubleVal(10)},
{"input.b", doubleVal(20)}
});
auto g3 = NamedGraphWithNewInputs(&g2, {
{"input.b", doubleVal(10)}
});
printf("r2 = %f (version = %d)\n", g3.get("a+b")->asDouble(), g3.debug_NodeVersion("a+b"));
testDeadlockDetector();
}
#include <atomic>
#include <functional>
#include <utility>
#include <string>
#include <vector>
#include <stdexcept>
#include <map>
#include <bit>
/* Read-optimized linear-probe hashtable.
Reads are lock-free: read the table array pointer from an atomic
variable and perform a lookup.
Inserts must be protected by a mutex. They set atomic value
pointers which can be immediately picked up by readers.
On resize, a new table will be allocated and the table pointer is
changed to it.
Old tables are not deallocated until the desctructor is called to
have no dangling pointers.
*/
template<
typename Key,
typename Value,
typename Hash = std::hash<Key>,
typename KeyEq = std::equal_to<Key>
>
class LockFreeLookupHashTable {
static constexpr size_t INITIAL_SIZE = 16; // must be power of 2
struct Item {
std::atomic<size_t> hash;
Key key;
Value value;
};
using Table = std::vector<Item>;
std::atomic<Table*> table; // current pointer, swapped on resize
std::vector<Table*> oldTables; // to free memory
size_t used;
public:
LockFreeLookupHashTable() : table(new Table(INITIAL_SIZE)), used(0) {}
LockFreeLookupHashTable(const std::initializer_list<std::pair<Key,Value>>& items) : LockFreeLookupHashTable() {
unsafeReserve(items.size());
for (auto& [k,v] : items)
unsafeEmplace(k, [&](){ return v; });
}
LockFreeLookupHashTable(const LockFreeLookupHashTable&) = delete;
LockFreeLookupHashTable& operator=(const LockFreeLookupHashTable&) = delete;
LockFreeLookupHashTable(LockFreeLookupHashTable&&) = delete;
LockFreeLookupHashTable& operator=(LockFreeLookupHashTable&&) = delete;
~LockFreeLookupHashTable() {
delete table.load(std::memory_order_acquire);
for (auto t : oldTables) delete t;
}
// lock-free
const Value *lookupPtr(const Key& key) const {
const Table& t = *table.load(std::memory_order_acquire);
size_t size = t.size();
size_t mask = size - 1;
size_t hash = Hash{}(key);
size_t index = hash & mask;
for (size_t i = 0; i < size; i++) {
const Item& item = t[(index+i) & mask];
size_t h = item.hash.load(std::memory_order_acquire);
if (h == scaleHash(hash) && KeyEq{}(item.key, key))
return &item.value;
if (h == 0)
return nullptr;
}
return nullptr;
}
// must be protected by a mutex
void unsafeReserve(size_t s) {
s = std::bit_ceil(s);
Table& t = *table.load(std::memory_order_acquire);
if (s > t.size())
resize(t, s);
}
static const int LOAD_FACTOR = 3;
// must be protected by a mutex
const Value *unsafeEmplace(const Key& key, const auto& newValue)
{
Table& t = *table.load(std::memory_order_acquire);
const Value *r = emplaceIntoTable(t, key, [&]() {
++used;
return newValue();
});
if (used * LOAD_FACTOR > t.size())
resize();
return r;
}
size_t size() const { return used; }
private:
static size_t scaleHash(size_t h) { return h | 1; }
Value *emplaceIntoTable(Table& t, const Key& key, const auto& newValue) {
size_t size = t.size();
size_t mask = size - 1;
size_t hash = Hash{}(key);
size_t index = hash & mask;
for (size_t i = 0; i < size; i++) {
Item& item = t[(index+i) & mask];
size_t h = item.hash.load(std::memory_order_acquire);
if (h == scaleHash(hash) && KeyEq{}(item.key, key))
return &item.value; // return existing
if (!h) {
item.key = key;
item.value = newValue();
item.hash.store(scaleHash(hash), std::memory_order_release);
return &item.value;
}
}
// shouldn't end up here as we resize the table
throw std::runtime_error("the table is full?");
}
void resize() {
Table& t = *table.load(std::memory_order_acquire);
resize(t, t.size() * 2);
}
void resize(Table& t, size_t newSize) {
Table& newTable = *new Table(newSize);
for (const auto& item : t) {
size_t h = item.hash.load(std::memory_order_acquire);
if (h)
emplaceIntoTable(newTable, item.key, [&]{ return item.value; });
}
oldTables.push_back(&t);
table.store(&newTable, std::memory_order_release);
// printf("resized to %lu\n", newTable.size());
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment