Last active
November 23, 2025 15:12
-
-
Save vshabanov/d3c1d50e08dd95a682153f3d2fb0940d to your computer and use it in GitHub Desktop.
Flat purely functional DAG with updateable inputs, lock-free lookups, and a concurrent node construction with a deadlock detector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // g++ -g -fopenmp -O2 -std=gnu++2c -Wall -Wpedantic -Iboost_1_89_0 Graph.cpp && ./a.out | |
| /* [WIP] | |
| TODO | |
| - removing unused nodes? | |
| - ref data use case -- evaluate immediately and only keep results | |
| - exceptions in recipes -- do not keep unused nodes | |
| - recipes that add nodes but do not use them | |
| --> create temporary nodes in construct() and only extend IndexedGraph | |
| once construct() succeeds | |
| but still can evaluate them | |
| - test parallel evals (& constructs) | |
| - eval() in copies (which no longer extend 'values'): | |
| - could run in parallel with construct(). | |
| --> test | |
| - could be lock-free | |
| --> test | |
| - can have per node function mutexes to remove repeated evaluations | |
| --> todo | |
| - we can keep the graph version and NamedGraphWithNewInputs will not find | |
| nodes added to the base graph (while it will still share the hash table) | |
| - serialization can save the current number of nodes and ignore | |
| nodes added after it -- lock-free too | |
| --- advanced | |
| - publishing values inplace? just make IndexedGraphNewInputs? | |
| - could be implemented with a list of parents | |
| publishing replaces inputs and clears parents | |
| we have shared lock for evals (to block resize) | |
| could use if for atomic publish | |
| - tracking of parents (so we can make a full copy of data | |
| and only clear values that we need to re-evaluate) | |
| - we can keep list of parents, find a list of changed nodes, and, | |
| if it's less than a half (or a quarter), use hash map | |
| - we can also pre-copy unchanged nodes to speedup evals (though | |
| extra logic can slowdown the total copy+eval time) | |
| - list of parents better done as arrays so we are slower at | |
| construction and serialisation, but faster at bumping | |
| - we can optimise the common case of a few nodes: the flat struct | |
| with a size and a first few elements, and a pointer to the rest | |
| (not a union for the lock-free access) | |
| x constructLock can be less coarse -- we can't parallelize small | |
| constructs due to the lock. | |
| thread can be allocated in graphenv (with checks that graph env doesn't | |
| go from thread to thread). Or it can be in lock-free lookup table | |
| (we don't have too many threads, can leak them). | |
| newThread/freeThread/newConstruct can all work | |
| outside of any lock. | |
| --> finer grained locks slowdown the sequential case | |
| 1M+ constructs a second looks ok | |
| x create thin NamedGraph that only has current recipe stack + | |
| IndexedGraph* since we do a deadlock detection we won't have | |
| circular references. | |
| --> we just keep thread id->thread map | |
| x we can start building other recipes while we wait | |
| --> no, too hard to do, we shall always run recipes from | |
| top-level depends without spawning extra threads inside | |
| we can limit the number of active threads with a semaphore | |
| curve | |
| usd -> curve | |
| eur -> usd | |
| curve -- runs | |
| usd waits on curve, runs eur, waits on usd -- that's a loop | |
| unless we use a different thread ID for the extra run | |
| then once curve ends it can run usd (but how, if we run eur?) | |
| with green threads we could spawn a thread whilst we wait | |
| with native threads we can run more threads but have a semaphore | |
| on wait. | |
| ideally, we need continuations like in nested data parallelism | |
| instead of evaluating depend directly we shall put this depend to | |
| a queue with a continuation to call once it's ready | |
| but this will require changes in user code | |
| we can provide a simplified parMap that uses current thread | |
| + free top-level threads ?? | |
| */ | |
| /* Flat DAG with changeable inputs. | |
| Creating a copy with changed inputs is basically one calloc(O(n)) | |
| (much faster than node-based). | |
| Serialization should be faster too. | |
| Computing values is also up to O(n), not O(n_changed). Same as | |
| node-based w/o marking changed parents, but with better cache | |
| utilization. | |
| */ | |
| #include "LockFreeLookupHashTable.hpp" | |
| #include <algorithm> | |
| #include <boost/functional/hash.hpp> | |
| #include <boost/unordered/unordered_flat_map.hpp> | |
| #include <chrono> | |
| #include <forward_list> | |
| #include <future> | |
| #include <latch> | |
| #include <memory_resource> | |
| #include <mutex> | |
| #include <new> | |
| #include <optional> | |
| #include <random> | |
| #include <ranges> | |
| #include <set> | |
| #include <shared_mutex> | |
| #include <thread> | |
| #include <thread> | |
| #include <time.h> | |
| #include <type_traits> | |
| #include <vector> | |
| ////////////////////////////////////////////////////////////////////////////// | |
| // Flat DAG | |
| // | |
| // Values and opcodes | |
| // | |
| template < typename T > | |
| using string_map_t = LockFreeLookupHashTable<std::string, T, boost::hash<std::string>>; | |
| // simple dynamically typed values for demo and for checking memory issues | |
| enum Type { String, Double }; | |
| struct Value { | |
| union Data { | |
| const char *asString; | |
| double asDouble; | |
| }; | |
| std::atomic<size_t> refCount; | |
| const Type type; | |
| const Data data; | |
| Value(size_t rc, Type t, Data d) : refCount(rc), type(t), data(d) {} | |
| Value(Type t, Data d) : refCount(0), type(t), data(d) {} | |
| ~Value() { if (type == String) free((void*)data.asString); } | |
| void AddRef() { | |
| // size_t prev = | |
| refCount.fetch_add(1, std::memory_order_relaxed); | |
| // printSelf(); printf(" AddRef %zu\n", prev+1); | |
| } | |
| void Release() { | |
| size_t prev = refCount.fetch_sub(1, std::memory_order_release); | |
| // printSelf(); printf(" Release %zu\n", prev-1); | |
| if (prev == 1) { | |
| std::atomic_thread_fence(std::memory_order_acquire); | |
| delete this; | |
| } | |
| } | |
| void printSelf() { | |
| if (type == Double) printf("%f", data.asDouble); | |
| else if (type == String) printf("\"%s\"", data.asString); | |
| else printf("unknown type %d", type); | |
| } | |
| double asDouble() const { | |
| if (type == Double) | |
| return data.asDouble; | |
| else | |
| throw std::runtime_error("not a double?"); | |
| } | |
| const char *asString() const { | |
| if (type == Double) | |
| return data.asString; | |
| else | |
| throw std::runtime_error("not a string?"); | |
| } | |
| }; | |
| // Value dummyDouble(0x100000000, Double, { .asDouble = 1.0 }); | |
| // Value dummyString(0x100000000, Double, { .asString = "dummy" }); | |
| Value *doubleVal(double d) { | |
| // return &dummyDouble; | |
| return new Value(Double, { .asDouble = d }); | |
| } | |
| Value *stringVal(const char* s) { | |
| // return &dummyString; | |
| return new Value(String, { .asString = strdup(s) }); | |
| } | |
| // all opcode arguments have the same size to be packed in one array | |
| typedef uint32_t OpcodeArg; | |
| typedef OpcodeArg NodeIndex; | |
| typedef OpcodeArg OpcodeArgIndex; // IndexedGraph::opcodeArgs[i] | |
| struct FunctionArgs { | |
| OpcodeArg argsCount; | |
| OpcodeArg functionIndex; | |
| OpcodeArg nodeIndex[1]; | |
| }; | |
| // Opcode to compute graph node value | |
| struct Opcode { | |
| enum Code { | |
| Const, | |
| Input, | |
| // Select, | |
| Function | |
| }; | |
| // upper byte is code, lower 3 bytes is index in IndexedGraph::opcodeArgs | |
| uint32_t data; | |
| Opcode() : data(0) {} | |
| Opcode(Code c, OpcodeArgIndex i = 0) : data((c << 24) + (i & 0xFFFFFF)) { | |
| if (i > 0xFFFFFF) | |
| throw std::runtime_error("Opcode index is too big"); | |
| } | |
| Code code() const { return static_cast<Code>(data >> 24); } | |
| OpcodeArgIndex argsIndex() const { return data & 0xFFFFFF; } | |
| }; | |
| typedef std::function<Value*(const Value**)> NodeFunction; | |
| // | |
| // Utils | |
| // | |
| // a quick hack for missing std::scope_exit | |
| template <typename F> | |
| struct scope_exit { | |
| explicit scope_exit(const F&& f) : onExit(f) {} | |
| ~scope_exit() { onExit(); } | |
| private: | |
| F onExit; | |
| }; | |
| struct null_mutex { | |
| void lock() noexcept {} | |
| void lock_shared() noexcept {} | |
| bool try_lock() noexcept { return true; } | |
| void unlock() noexcept {} | |
| void unlock_shared() noexcept {} | |
| }; | |
| // | |
| // Append-only vector for flat DAG | |
| // | |
| // special append only vector for graphs. | |
| // can be read lock-free, | |
| // updates must be protected by a shared mutex to block resizes | |
| // alloc will block if needs a resize | |
| template <typename T> | |
| class AppendOnlyVectorBase { | |
| public: | |
| AppendOnlyVectorBase() | |
| : vecPtr(new std::vector<T>(16, T{})) | |
| , vecSize{0} | |
| {} | |
| ~AppendOnlyVectorBase() { | |
| delete &vec(); | |
| for (auto v : oldVecs) delete v; | |
| } | |
| size_t size() const { return vecSize.load(std::memory_order_acquire); } | |
| // alloc at the end | |
| size_t alloc(size_t n, auto& lock) { | |
| size_t i = vecSize.fetch_add(n, std::memory_order_relaxed); | |
| ensure(i, n, lock); | |
| return i; | |
| } | |
| void ensure(size_t i, size_t n, auto& lock) { | |
| Vec& v = vec(); | |
| if (v.size() < i + n) | |
| grow(std::bit_ceil(i + n), lock); | |
| } | |
| protected: | |
| T& at(size_t i, const char *caller) { | |
| Vec& v = vec(); | |
| checkValid(v, i, caller); | |
| return v[i]; | |
| } | |
| const T& at(size_t i, const char *caller) const { | |
| Vec& v = vec(); | |
| checkValid(v, i, caller); | |
| return v[i]; | |
| } | |
| private: | |
| using Vec = std::vector<T>; | |
| Vec& vec() const { return *vecPtr.load(std::memory_order_acquire); } | |
| static inline void checkValid(const Vec& v, size_t i, const char *caller) { | |
| // if (i >= v.size()) [[unlikely]] | |
| // throw std::runtime_error( | |
| // std::string(caller) + " index out of bounds " | |
| // + std::to_string(i) + " >= " + std::to_string(v.size())); | |
| } | |
| void grow(size_t newSize, auto& lock) { | |
| std::scoped_lock _(lock); | |
| Vec& v = vec(); | |
| if (v.size() >= newSize) | |
| return; // already reallocated | |
| Vec *newV = new std::vector<T>(newSize, T{}); | |
| oldVecs.push_back(&v); | |
| std::copy(v.begin(), v.end(), newV->begin()); | |
| vecPtr.store(newV, std::memory_order_release); | |
| } | |
| std::atomic<Vec*> vecPtr; // current pointer, swapped on resize | |
| std::vector<Vec*> oldVecs; // to free memory | |
| std::atomic<size_t> vecSize; | |
| }; | |
| template <typename T> | |
| class AppendOnlyVector : public AppendOnlyVectorBase<T> { | |
| public: | |
| const T& operator[](size_t i) const { return this->at(i, "[]"); } | |
| // unsafe* calls must be protected by a shared mutex | |
| T& unsafeIndex(size_t i) { return this->at(i, "unsafeIndex"); } | |
| void unsafeSet(size_t i, T x) { this->at(i, "unsafeSet") = x; } | |
| }; | |
| template <typename T> | |
| class AtomicAppendOnlyVector : public AppendOnlyVectorBase<T> { | |
| public: | |
| void unsafeStore(size_t i, T x, std::memory_order o) { | |
| std::atomic_ref(this->at(i, "unsafeStore")).store(x, o); | |
| } | |
| T load(size_t i, std::memory_order o) const { | |
| return std::atomic_ref(this->at(i, "load")).load(o); | |
| } | |
| }; | |
| // | |
| // Base DAG | |
| // | |
| // Graph nodes + opcodes and base values | |
| // Can be extended while evaluated (only locks on resizes). | |
| // Evals use shared_mutex to block resizes. | |
| // Updated graphs ('IndexedGraphNewInputs' below) only create a new 'values' array | |
| // | |
| // It's index-based append-only graph, names are added on another level. | |
| struct IndexedGraph { | |
| // opcodes to compute node values, arguments are flattened into arrays | |
| AppendOnlyVector<Opcode> opcodes; | |
| AppendOnlyVector<OpcodeArg> opcodeArgs; | |
| AppendOnlyVector<NodeFunction> functions; | |
| // Vec<Value::Type> types; | |
| // node values | |
| mutable AtomicAppendOnlyVector<Value*> values; | |
| static constexpr std::size_t CACHE_LINE = 64; //std::hardware_destructive_interference_size; | |
| alignas (CACHE_LINE) mutable std::shared_mutex resizeLock; | |
| // alignas (CACHE_LINE) mutable null_mutex resizeLock; | |
| // for better error messages only, IndexedGraph is index-based | |
| std::function<const std::string *(NodeIndex)> nodeIndexToName; | |
| ~IndexedGraph() { | |
| for (size_t i = 0; i < values.size(); i++) { | |
| auto v = values.load(i, std::memory_order_acquire); | |
| if (v) v->Release(); | |
| } | |
| } | |
| [[noreturn]] void ERR(const std::string& why, NodeIndex i) const { | |
| const std::string *e = nodeIndexToName ? nodeIndexToName(i) : nullptr; | |
| throw std::runtime_error(e ? why + " " + *e : why); | |
| } | |
| NodeIndex allocNode() { | |
| NodeIndex r = values.alloc(1, resizeLock); | |
| opcodes.ensure(r, 1, resizeLock); | |
| // can't use opcodes.alloc due to a race: | |
| // thread A: values.alloc -> 0 | |
| // thread B: values.alloc -> 1 | |
| // thread B: opcodes.alloc -> 0 | |
| // thread B: segfault accessing opcodes[1] | |
| // thread A: opcodes.alloc -> 1 -- too late | |
| return r; | |
| } | |
| // node creation only locks on resizes, i.e. ~4*10 times for 1024 nodes graph | |
| NodeIndex constant(Value *c) { | |
| c->AddRef(); | |
| NodeIndex i = allocNode(); | |
| std::shared_lock _(resizeLock); | |
| opcodes.unsafeSet(i, Opcode(Opcode::Const)); | |
| values.unsafeStore(i, c, std::memory_order_release); | |
| return i; | |
| } | |
| NodeIndex input() { | |
| NodeIndex i = allocNode(); | |
| std::shared_lock _(resizeLock); | |
| opcodes.unsafeSet(i, Opcode(Opcode::Input)); | |
| values.unsafeStore(i, nullptr, std::memory_order_release); | |
| return i; | |
| } | |
| NodeIndex input(Value *d) { | |
| d->AddRef(); | |
| NodeIndex i = allocNode(); | |
| std::shared_lock _(resizeLock); | |
| opcodes.unsafeSet(i, Opcode(Opcode::Input)); | |
| values.unsafeStore(i, d, std::memory_order_release); | |
| return i; | |
| } | |
| template <typename... Args> | |
| NodeIndex function(const NodeFunction& f, Args... args) { | |
| NodeIndex i = allocNode(); | |
| OpcodeArg argsIndex = | |
| opcodeArgs.alloc(2 + sizeof...(Args), resizeLock); | |
| OpcodeArg functionIndex = functions.alloc(1, resizeLock); | |
| std::shared_lock _(resizeLock); | |
| functions.unsafeSet(functionIndex, f); | |
| FunctionArgs& fa = *(FunctionArgs*)&opcodeArgs.unsafeIndex(argsIndex); | |
| fa.argsCount = sizeof...(Args); | |
| fa.functionIndex = functionIndex; | |
| size_t ni = 0; | |
| ((fa.nodeIndex[ni++] = args), ...); | |
| opcodes.unsafeSet(i, Opcode(Opcode::Function, argsIndex)); | |
| values.unsafeStore(i, nullptr, std::memory_order_release); | |
| return i; | |
| } | |
| Value *eval(NodeIndex i) const { | |
| auto r = values.load(i, std::memory_order_acquire); | |
| if (r) | |
| return r; | |
| auto opcode = opcodes[i].code(); | |
| switch (opcode) { | |
| case Opcode::Input: | |
| ERR("Undefined input", i); | |
| case Opcode::Const: | |
| ERR("Undefined constant?", i); | |
| case Opcode::Function: { | |
| const FunctionArgs& fa = | |
| *(const FunctionArgs*)&opcodeArgs[opcodes[i].argsIndex()]; | |
| const Value **args = (const Value**)alloca(sizeof args[0] * fa.argsCount); | |
| for (OpcodeArg i = 0; i < fa.argsCount; i++) { | |
| args[i] = eval(fa.nodeIndex[i]); | |
| } | |
| Value *r = functions[fa.functionIndex](args); | |
| r->AddRef(); | |
| std::shared_lock _(resizeLock); | |
| // shared lock guarantees no resizes and vector copies | |
| // are taking place, we can replace value atomically. | |
| values.unsafeStore(i, r, std::memory_order_release); | |
| return r; | |
| } | |
| default: | |
| throw std::runtime_error("unsupported opcode " + std::to_string(opcode)); | |
| } | |
| } | |
| }; | |
| // | |
| // Graph with new inputs | |
| // | |
| // lock-free eval even if the base graph is being extended | |
| struct IndexedGraphNewInputs { | |
| struct NodeValue { | |
| std::atomic<Value*> value; | |
| // only version == graph version needs to be destroyed | |
| // version < graph version are shared values | |
| int version; | |
| NodeValue() : value(nullptr), version(0) {} | |
| NodeValue& operator=(const NodeValue& n) { | |
| auto v = n.value.load(std::memory_order_acquire); | |
| version = n.version; | |
| value.store(v, std::memory_order_release); | |
| return *this; | |
| } | |
| }; | |
| size_t size; | |
| NodeValue *values; | |
| IndexedGraphNewInputs *parent; | |
| const IndexedGraph *base; | |
| int version; | |
| typedef std::vector<std::pair<NodeIndex, Value*>> Inputs; | |
| ~IndexedGraphNewInputs() { | |
| for (size_t i = 0; i < size; i++) { | |
| NodeValue& nv = values[i]; | |
| if (nv.version == version) { | |
| Value *v = nv.value.load(std::memory_order_acquire); | |
| if (v) v->Release(); | |
| } | |
| } | |
| delete[] values; | |
| } | |
| IndexedGraphNewInputs(const IndexedGraph *b, const Inputs& inputs) { | |
| base = b; | |
| version = 1; | |
| parent = nullptr; | |
| initValues(inputs); | |
| } | |
| IndexedGraphNewInputs(IndexedGraphNewInputs *p, const Inputs& inputs) { | |
| parent = p; | |
| base = parent->base; | |
| version = parent->version + 1; | |
| initValues(inputs); | |
| } | |
| void initValues(const Inputs& inputs) { | |
| size = base->values.size(); | |
| values = new NodeValue[size]; | |
| for (const auto& [index, val] : inputs) { | |
| auto& v = values[index]; | |
| v.version = version; | |
| if (val) | |
| val->AddRef(); | |
| v.value.store(val, std::memory_order_relaxed); | |
| } | |
| std::atomic_thread_fence(std::memory_order_release); | |
| } | |
| Value *eval(NodeIndex i) { | |
| return evalNodeValue(i).value.load(std::memory_order_relaxed); | |
| } | |
| NodeValue& evalNodeValue(NodeIndex i) { | |
| NodeValue& vi = values[i]; | |
| if (vi.value.load(std::memory_order_acquire)) | |
| return vi; | |
| auto opcode = base->opcodes[i].code(); | |
| switch (opcode) { | |
| case Opcode::Input: | |
| if (parent) { | |
| vi = parent->evalNodeValue(i); | |
| return vi; | |
| } | |
| // fallthrough | |
| case Opcode::Const: | |
| vi.version = 0; | |
| vi.value.store(base->eval(i), std::memory_order_release); | |
| return vi; | |
| case Opcode::Function: { | |
| // only function evaluation needs a lock | |
| // and only not to evaluate the same thing several times | |
| const FunctionArgs& fa = | |
| *(const FunctionArgs*)&base->opcodeArgs[base->opcodes[i].argsIndex()]; | |
| const Value **args = (const Value **) | |
| alloca(sizeof args[0] * fa.argsCount); | |
| int maxVersion = 0; | |
| for (OpcodeArg i = 0; i < fa.argsCount; i++) { | |
| NodeValue& v = evalNodeValue(fa.nodeIndex[i]); | |
| args[i] = v.value.load(std::memory_order_acquire); | |
| maxVersion = std::max(maxVersion, v.version); | |
| } | |
| if (maxVersion < version) { | |
| vi = parent->evalNodeValue(i); | |
| return vi; | |
| } | |
| Value *r = base->functions[fa.functionIndex](args); | |
| r->AddRef(); | |
| vi.version = version; | |
| vi.value.store(r, std::memory_order_release); | |
| return vi; | |
| } | |
| default: | |
| throw std::runtime_error("unsupported opcode " + std::to_string(opcode)); | |
| } | |
| } | |
| }; | |
| ////////////////////////////////////////////////////////////////////////////// | |
| // Deadlock detector | |
| // Deadlock loop info for the error reporting | |
| struct DeadlockInfo { | |
| typedef std::vector<std::string> Loop; | |
| struct IndexedLoop { | |
| Loop loop; | |
| boost::unordered_flat_map<std::string, size_t> nameToIndex; | |
| IndexedLoop(Loop&& _loop) : loop(_loop) { | |
| size_t i = 0; | |
| for (auto& n : loop) | |
| nameToIndex.emplace(n, i++); | |
| } | |
| }; | |
| const std::shared_ptr<const IndexedLoop> indexedLoop; | |
| size_t startIndex; // start index in the loop | |
| Loop extra; // extra recipes on top of the loop | |
| DeadlockInfo(Loop&& _loop) | |
| : indexedLoop(new IndexedLoop(std::move(_loop))) | |
| , startIndex(0) | |
| {} | |
| DeadlockInfo(const std::string& x, const DeadlockInfo& xs) | |
| : indexedLoop(xs.indexedLoop) | |
| , startIndex(xs.startIndex) | |
| , extra(xs.extra) | |
| { | |
| if (extra.size() == 0) { | |
| auto it = indexedLoop->nameToIndex.find(x); | |
| if (it != indexedLoop->nameToIndex.end()) { | |
| // we're seeing the same loop exception from the other | |
| // node in the loop, change the loop start index | |
| startIndex = it->second; | |
| return; | |
| } | |
| } | |
| // we're out of the loop, append the node to the extra path | |
| extra.push_back(x); | |
| } | |
| std::string formatError() const { | |
| std::ostringstream oss; | |
| oss << "A loop in a recipe:\n"; | |
| auto out = [&](const std::string& n) { oss << " " << n << "\n"; }; | |
| for (auto e : extra | std::views::reverse) | |
| out(e); | |
| auto& l = indexedLoop->loop; | |
| out(l[startIndex] + " <--"); | |
| for (size_t i = startIndex + 1; i < l.size(); i++) | |
| out(l[i]); | |
| for (size_t i = 0; i < startIndex; i++) | |
| out(l[i]); | |
| out(l[startIndex] + " -->"); | |
| return oss.str(); | |
| } | |
| }; | |
| struct DeadlockException : std::runtime_error { | |
| DeadlockInfo deadlockInfo; | |
| DeadlockException(const DeadlockInfo& d) | |
| : std::runtime_error(d.formatError()) | |
| , deadlockInfo(d) | |
| {} | |
| DeadlockException(const std::string& x, const DeadlockInfo& xs) | |
| : DeadlockException(DeadlockInfo(x, xs)) | |
| {} | |
| DeadlockException(const std::string& x, const DeadlockException& xs) | |
| : DeadlockException(DeadlockInfo(x, xs.deadlockInfo)) | |
| {} | |
| }; | |
| /* | |
| a -> b -> c -> a loop | |
| d -> c | |
| construct [a,b,c,d] | |
| a -> b (thread #1) ok | |
| b -> c (thread #2) ok | |
| d -> c (thread #3) ok | |
| c -> a (thread #4) -- deadlock | |
| fails recipe 'c', it will fail others who wait | |
| */ | |
| // Builds a wait-for graph https://en.wikipedia.org/wiki/Wait-for_graph | |
| // from Threads to Constructs and detects loops. | |
| // | |
| // O(n_chainedWaitingThreads) per waitConstruct. I.e. many threads | |
| // waiting for one recipe are still O(1) per waitConstruct. | |
| // Only >O(1) if there's a chain of recipes to follow. | |
| // | |
| // It's a pure algorithm (can be tested single threaded). | |
| // | |
| // newThread/freeThread/newConstruct can all work | |
| // lock-free provided they're called from the same thread | |
| // | |
| // waitConstruct needs a lock (walks through different threads) | |
| // constructFinished needs a lock (shared between threads) | |
| struct DeadlockDetector { | |
| // flag whether the node was visited, keeps the current scan version | |
| typedef uint64_t ScanId; | |
| ScanId curScanId; | |
| struct Construct; | |
| typedef std::forward_list<Construct*> ConstructStack; | |
| struct Thread { | |
| const std::string name; | |
| bool waiting; // waiting for the top of the constructStack to be ready | |
| ConstructStack constructStack; | |
| }; | |
| struct Construct { | |
| const std::string name; | |
| Thread *constructThread; // the thread that's building the recipe | |
| ScanId visitedScanId; // used to mark visited nodes in checkLoops | |
| size_t refCount; // 1 newConstruct + N waitConstruct | |
| // non-atomic as waitConstruct | |
| }; | |
| DeadlockDetector() : curScanId(0) {} | |
| Thread *newThread(const std::string& name) { | |
| return new Thread{ | |
| .name = name, | |
| .waiting = false, | |
| .constructStack = {} | |
| }; | |
| } | |
| void freeThread(Thread *t) { | |
| if (!t->constructStack.empty()) | |
| ERR("freeThread: non-empty construct stack " | |
| + toString(t->constructStack)); | |
| delete t; | |
| } | |
| // Call when you start constructing a new named node | |
| Construct *newConstruct(Thread *thread, const std::string& name) { | |
| auto construct = new Construct{ | |
| .name = name, | |
| .constructThread = thread, | |
| .visitedScanId = 0, | |
| .refCount = 1 | |
| }; | |
| thread->constructStack.push_front(construct); | |
| return construct; | |
| } | |
| // Call when the node you want to construct is already being constructed. | |
| // Does not perform an actual waiting, only changes the detector state. | |
| // Must be protected by a lock. | |
| void waitConstruct(Thread *thread, Construct *c) { | |
| if (thread->waiting) | |
| ERR("waitConstruct: thread is already in the waiting state?"); | |
| ++curScanId; | |
| DeadlockInfo::Loop loop; | |
| if (deadlock(thread, c, loop)) { | |
| loop.push_back(c->name); | |
| std::reverse(loop.begin(), loop.end()); | |
| // note, it's the deadlock loop only. | |
| // need extra logic to add constructs that lead to the | |
| // deadlock | |
| throw DeadlockException(std::move(loop)); | |
| } | |
| thread->waiting = true; | |
| thread->constructStack.push_front(c); | |
| c->refCount++; | |
| } | |
| // the meat | |
| [[nodiscard("check the deadlock status")]] | |
| bool deadlock(const Thread *t, Construct *c, DeadlockInfo::Loop& loop) const { | |
| if (c->visitedScanId == curScanId || !c->constructThread) | |
| return false; // already visited or already constructed | |
| c->visitedScanId = curScanId; | |
| // printf("check %s %s (%s)\n", t->name.data(), c->name.data(), | |
| // c->constructThread->name.data()); | |
| if (t == c->constructThread) { | |
| // loop found -- we're already constructing this recipe ourselves | |
| addStackSlice(c, loop); | |
| return true; // deadlock! | |
| } else if (c->constructThread->waiting) { | |
| // if the recipe thread is waiting, follow the recipe it waits for | |
| auto waitsFor = c->constructThread->constructStack.front(); | |
| if (deadlock(t, waitsFor, loop)) { | |
| addStackSlice(c, loop); | |
| return true; | |
| } | |
| } | |
| return false; | |
| } | |
| void addStackSlice(Construct *c, DeadlockInfo::Loop& loop) const { | |
| // add [top..c) slice from of the recipe stack | |
| // for the error reporting | |
| for (auto x : c->constructThread->constructStack) { | |
| if (x == c) break; | |
| loop.push_back(x->name); | |
| } | |
| } | |
| // Call once your construct has finished (even if it throws an exception) | |
| bool constructFinished(Thread *t, Construct *c) { | |
| if (t->constructStack.empty()) | |
| ERR("constructFinished: empty constructStack"); | |
| if (t->constructStack.front() != c) | |
| ERR("constructFinished: construct mismatch " + c->name | |
| + " in not the last in " + toString(t->constructStack)); | |
| t->constructStack.pop_front(); | |
| t->waiting = false; | |
| if (t == c->constructThread) | |
| // clear, so we won't follow the thread that has potentially | |
| // finished its job and deleted | |
| c->constructThread = nullptr; | |
| bool last = --c->refCount == 0; | |
| if (last) | |
| delete c; | |
| return last; | |
| } | |
| // utils | |
| [[noreturn]] static void ERR(const std::string& why) { | |
| // printf("ERR: %s\n", why.data()); | |
| throw std::runtime_error(why); | |
| } | |
| static std::string toString(const ConstructStack& l) { | |
| std::vector<const std::string*> names; | |
| for (auto c : l) | |
| names.push_back(&c->name); | |
| std::ostringstream oss; | |
| for (auto n : names | std::views::reverse) { | |
| oss << " " << *n << '\n'; | |
| } | |
| return oss.str(); | |
| } | |
| }; | |
| // | |
| // DeadlockDetector + named constructs | |
| // | |
| template <typename T> | |
| struct NamedDeadlockDetector | |
| { | |
| typedef std::mutex mutex_t; | |
| // all constructs data modifications must be protected by a clock | |
| mutex_t constructLock; | |
| string_map_t<T> nameToT; | |
| LockFreeLookupHashTable<T, std::string> tToName; | |
| DeadlockDetector deadlockDetector; | |
| enum Result { OK, Deadlock, Exception }; | |
| struct ConstructThunk { | |
| DeadlockDetector::Construct *construct; | |
| std::latch ready; | |
| std::variant<T, DeadlockInfo, std::exception_ptr> result; | |
| ConstructThunk(DeadlockDetector::Construct *_construct) | |
| : construct(_construct) | |
| , ready{1} | |
| {} | |
| }; | |
| template <typename ...Args> | |
| using map_t = boost::unordered_flat_map<Args...>; | |
| map_t<std::string, ConstructThunk*> constructThunks; | |
| map_t<std::thread::id, DeadlockDetector::Thread*, std::hash<std::thread::id>> threads; | |
| T construct(const std::string& name, const auto& build) { | |
| auto r = nameToT.lookupPtr(name); | |
| if (r) return *r; // already constructed | |
| auto threadId = std::this_thread::get_id(); | |
| bool newConstruct = false; | |
| DeadlockDetector::Thread *t = nullptr; | |
| ConstructThunk *ct = nullptr; | |
| std::unique_lock lock(constructLock, std::defer_lock); | |
| auto _finally = scope_exit([&]{ | |
| if (!lock) lock.lock(); | |
| if (ct && deadlockDetector.constructFinished(t, ct->construct)) { | |
| constructThunks.erase(name); | |
| delete ct; | |
| } | |
| if (t && t->constructStack.empty()) { // last construct | |
| threads.erase(threadId); | |
| deadlockDetector.freeThread(t); | |
| } | |
| }); | |
| { | |
| lock.lock(); | |
| // recheck after taking a lock | |
| r = nameToT.lookupPtr(name); | |
| if (r) return *r; | |
| auto& tRef = threads[threadId]; | |
| if (!tRef) | |
| tRef = deadlockDetector.newThread("thread for " + name); | |
| t = tRef; | |
| auto& ctRef = constructThunks[name]; | |
| if (!ctRef) { | |
| newConstruct = true; | |
| ctRef = new ConstructThunk( | |
| deadlockDetector.newConstruct(t, name)); | |
| } else { | |
| // run the deadlock detector | |
| deadlockDetector.waitConstruct(t, ctRef->construct); | |
| // ^ it throws | |
| } | |
| ct = ctRef; | |
| lock.unlock(); | |
| } | |
| if (newConstruct) { | |
| try { | |
| ct->result.template emplace<OK>(build(name)); | |
| } catch (const DeadlockException& e) { | |
| ct->result.template emplace<Deadlock>(e.deadlockInfo); | |
| } catch (...) { | |
| ct->result.template emplace<Exception>(std::current_exception()); | |
| } | |
| ct->ready.count_down(); | |
| } else { | |
| ct->ready.wait(); | |
| } | |
| switch (ct->result.index()) { | |
| case OK: { | |
| lock.lock(); | |
| auto r = std::get<OK>(ct->result); | |
| nameToT.unsafeEmplace(name, [&](){ return r; }); | |
| tToName.unsafeEmplace(r, [&](){ return name; }); | |
| return r; | |
| } | |
| case Deadlock: | |
| throw DeadlockException(name, std::get<Deadlock>(ct->result)); | |
| case Exception: | |
| std::rethrow_exception(std::get<Exception>(ct->result)); | |
| default: | |
| throw std::runtime_error("Unexpected varian index " + std::to_string(ct->result.index())); | |
| } | |
| } | |
| }; | |
| ////////////////////////////////////////////////////////////////////////////// | |
| // Recipes | |
| struct NamedGraph; | |
| typedef std::function<NodeIndex(NamedGraph&, const std::string&)> GraphRecipe; | |
| typedef string_map_t<GraphRecipe> Cookbook; | |
| const GraphRecipe *findRecipe(const Cookbook& cb, const std::string& name) { | |
| auto r = cb.lookupPtr(name); | |
| if (r) | |
| return r; | |
| size_t pos = name.size(); | |
| while (true) { | |
| pos = name.rfind('.', pos); | |
| if (pos == std::string::npos) | |
| return nullptr; | |
| r = cb.lookupPtr(name.substr(0, pos)); | |
| if (r) | |
| return r; | |
| } | |
| } | |
| // IndexedGraph extended with ability to build named nodes. | |
| // Adds node name <-> index map, cookbook and DeadlockDetector | |
| struct NamedGraph | |
| { | |
| const Cookbook& cookbook; | |
| IndexedGraph base; | |
| NamedDeadlockDetector<NodeIndex> deadlockDetector; | |
| NamedGraph(const Cookbook& c) : cookbook(c) { | |
| base.nodeIndexToName = [&](NodeIndex n) { return getName(n); }; | |
| } | |
| Value *get(const std::string& name) const { | |
| return base.eval(getIndex(name)); | |
| } | |
| NodeIndex getIndex(const std::string& name) const { | |
| auto i = deadlockDetector.nameToT.lookupPtr(name); | |
| if (!i) | |
| throw std::runtime_error("Node " + name + " not found"); | |
| return *i; | |
| } | |
| const std::string *getName(NodeIndex i) const { | |
| return deadlockDetector.tToName.lookupPtr(i); | |
| } | |
| NodeIndex construct(const std::string& name) { | |
| return deadlockDetector.construct(name, [&](const std::string& name) { | |
| const GraphRecipe *r = findRecipe(cookbook, name); | |
| if (!r) | |
| throw std::runtime_error("No recipe for " + name); | |
| return (*r)(*this, name); | |
| }); | |
| } | |
| }; | |
| // version of IndexedGraphNewInputs with named nodes | |
| struct NamedGraphWithNewInputs | |
| { | |
| typedef std::vector<std::pair<std::string, Value*>> Inputs; | |
| const NamedGraph *env; | |
| IndexedGraphNewInputs graph; | |
| NamedGraphWithNewInputs(const NamedGraph *e, const Inputs& inputs) | |
| : env(e) | |
| , graph(&e->base, toIndexedInputs(e, inputs)) | |
| {} | |
| NamedGraphWithNewInputs(NamedGraphWithNewInputs *p, const Inputs& inputs) | |
| : env(p->env) | |
| , graph(&p->graph, toIndexedInputs(p->env, inputs)) | |
| {} | |
| static IndexedGraphNewInputs::Inputs toIndexedInputs( | |
| const NamedGraph *e, const Inputs& named) | |
| { | |
| IndexedGraphNewInputs::Inputs indexed; | |
| indexed.resize(named.size()); | |
| for (size_t i = 0; i < named.size(); i++) { | |
| indexed[i].first = e->getIndex(named[i].first); | |
| indexed[i].second = named[i].second; | |
| } | |
| return indexed; | |
| } | |
| Value *get(const std::string& name) { | |
| return graph.eval(env->getIndex(name)); | |
| } | |
| int debug_NodeVersion(const std::string& name) { | |
| return graph.evalNodeValue(env->getIndex(name)).version; | |
| } | |
| }; | |
| // | |
| // Tests | |
| // | |
| Value *plus(const Value **args) { | |
| return doubleVal(args[0]->asDouble() + args[1]->asDouble()); | |
| } | |
| static inline void cpu_relax() { | |
| #if defined(__x86_64__) || defined(__i386__) | |
| asm volatile("pause" ::: "memory"); | |
| #elif defined(__aarch64__) || defined(__arm__) | |
| asm volatile("yield" ::: "memory"); | |
| #endif | |
| } | |
| static inline uint64_t monotonic_ns() { | |
| struct timespec ts; | |
| clock_gettime(CLOCK_MONOTONIC, &ts); | |
| return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec; | |
| } | |
| static inline void spin_for_ns(uint64_t duration_ns) { | |
| uint64_t end = monotonic_ns() + duration_ns; | |
| while (monotonic_ns() < end) { | |
| cpu_relax(); | |
| } | |
| } | |
| typedef boost::unordered_flat_map<std::string, std::string> pcMap; | |
| struct Mark { std::string child; int mark; }; | |
| typedef boost::unordered_flat_map<std::string, Mark> pcMarkMap; | |
| bool hasLoop(pcMarkMap& map, int mark, const std::string& cur) { | |
| auto it = map.find(cur); | |
| enum { BAD = -100, GOOD = -200 }; | |
| if (it == map.end()) | |
| return false; | |
| if (it->second.mark == mark) { | |
| it->second.mark = BAD; | |
| return true; | |
| } | |
| if (it->second.mark == BAD) | |
| return true; | |
| if (it->second.mark == GOOD) | |
| return false; | |
| it->second.mark = mark; | |
| bool loop = hasLoop(map, mark, it->second.child); | |
| it->second.mark = loop ? BAD : GOOD; | |
| return loop; | |
| } | |
| void testDeadlockDetector() { | |
| if (false) { | |
| DeadlockDetector d; | |
| auto t1 = d.newThread("t1"); | |
| auto t2 = d.newThread("t2"); | |
| auto t3 = d.newThread("t3"); | |
| auto a = d.newConstruct(t1, "a"); | |
| auto b = d.newConstruct(t2, "b"); | |
| auto c = d.newConstruct(t3, "c"); | |
| auto aa = d.newConstruct(t1, "aa"); | |
| auto bb = d.newConstruct(t2, "bb"); | |
| auto cc = d.newConstruct(t3, "cc"); | |
| auto bbb = d.newConstruct(t2, "bbb"); | |
| d.waitConstruct(t1, bb); | |
| d.waitConstruct(t2, c); | |
| try { | |
| d.waitConstruct(t3, a); | |
| } catch (const DeadlockException& e) { | |
| throw DeadlockException(c->name, DeadlockException(cc->name, e)); | |
| } | |
| d.freeThread(t1); | |
| d.freeThread(t2); | |
| d.freeThread(t3); | |
| // construct [a,b,c] | |
| // a -> b (thread #1 parent a) ok | |
| // b -> c (thread #2 parent b) ok | |
| // c -> a (thread #3 parent c) -- deadlock | |
| } | |
| int n = 100000; | |
| pcMap parentChild; | |
| std::vector<std::string> nodes; | |
| Cookbook cookbook; | |
| parentChild.reserve(10*n); | |
| nodes.reserve(10*n); | |
| cookbook.unsafeReserve(10*n); | |
| auto input = [](NamedGraph& g, const std::string&){ return g.base.input(); }; | |
| auto node = [&](NamedGraph& g, const std::string& n){ | |
| // spin_for_ns(10000); | |
| // std::this_thread::sleep_for(std::chrono::nanoseconds(100)); | |
| auto c1 = g.base.constant(doubleVal(1.0)); | |
| auto i2 = g.base.input(doubleVal(2.0)); | |
| auto child = g.construct(parentChild[n]); | |
| return g.base.function(plus, c1, g.base.function(plus, child, i2)); | |
| }; | |
| std::mt19937 mt; | |
| std::uniform_int_distribution<int> rand(0, n-1); | |
| // simple 4 layers of hierarchy | |
| // for (int i = 0; i < n; i++) { | |
| // char from[100], mid1[100], mid2[100], to[100]; | |
| // snprintf(from, sizeof from, "a%06d", i); | |
| // snprintf(mid1, sizeof mid1, "b%06d", rand(mt)); | |
| // snprintf(mid2, sizeof mid1, "c%06d", rand(mt)); | |
| // snprintf(to, sizeof to, "i%03d", rand(mt)); | |
| // parentChild[from] = mid1; | |
| // parentChild[mid1] = mid2; | |
| // parentChild[mid2] = to; | |
| // cookbook.unsafeEmplace(from, [&](){ return node; }); | |
| // cookbook.unsafeEmplace(mid1, [&](){ return node; }); | |
| // cookbook.unsafeEmplace(mid2, [&](){ return node; }); | |
| // cookbook.unsafeEmplace(to, [&](){ return input; }); | |
| // nodes.push_back(from); | |
| // nodes.push_back(from); | |
| // nodes.push_back(mid1); | |
| // nodes.push_back(mid1); | |
| // nodes.push_back(mid2); | |
| // nodes.push_back(mid2); | |
| // nodes.push_back(to); | |
| // nodes.push_back(to); | |
| // } | |
| int nInputs = (int)(n/98); | |
| // why there's a jump 0->17k errors between n/98 and n/99 ? | |
| for (int i = 0; i < n; i++) { | |
| char from[100], to[100]; | |
| snprintf(from, sizeof from, "a%06d", i); | |
| nodes.push_back(from); | |
| if (i < nInputs) { | |
| cookbook.unsafeEmplace(from, [&](){ return input; }); | |
| } else { | |
| cookbook.unsafeEmplace(from, [&](){ return node; }); | |
| int out = rand(mt); | |
| snprintf(to, sizeof to, "a%06d", out); | |
| parentChild[from] = to; | |
| } | |
| } | |
| std::shuffle(nodes.begin(), nodes.end(), mt); | |
| pcMarkMap markMap; | |
| std::set<std::string> refErrors; | |
| for (auto & [p,c] : parentChild) | |
| markMap.emplace(p, Mark{c, -1}); | |
| for (size_t i = 0; i < nodes.size(); i++) | |
| if (hasLoop(markMap, i, nodes[i])) { | |
| refErrors.emplace(nodes[i]); | |
| } | |
| for (size_t reps = 0; reps < 10; reps++) { | |
| NamedGraph g(cookbook); | |
| std::mutex errorsLock; | |
| std::map<std::string, std::string> errors; | |
| #pragma omp parallel for num_threads(24) schedule(dynamic, 1000) | |
| for (size_t i = 0; i < nodes.size(); i++) { | |
| try { | |
| g.construct(nodes[i]); | |
| } catch (const DeadlockException& e) { | |
| // puts(e.what()); | |
| std::scoped_lock _(errorsLock); | |
| errors.emplace(nodes[i], e.what()); | |
| } | |
| } | |
| printf("errors %zd, ref errors %zd, size %zd, nodes %zd\n", errors.size(), refErrors.size(), g.deadlockDetector.nameToT.size(), g.base.values.size()); | |
| std::set<std::string> diff; | |
| auto errorNodes = std::views::keys(errors); | |
| std::set_difference(errorNodes.begin(), errorNodes.end(), | |
| refErrors.begin(), refErrors.end(), | |
| std::inserter(diff, diff.end())); | |
| for (auto & e : diff) { | |
| printf("%s: %s\n", e.data(), errors[e].data()); | |
| } | |
| } | |
| } | |
| int main() | |
| { | |
| Cookbook cb = { | |
| {"a", [](NamedGraph& g, const std::string&){ return g.construct("b"); } }, | |
| {"b", [](NamedGraph& g, const std::string&){ return g.construct("c"); } }, | |
| {"c", [](NamedGraph& g, const std::string&){ return g.construct("a"); } }, | |
| {"constant", [](NamedGraph& g, const std::string&){ | |
| return g.base.constant(doubleVal(1)); } }, | |
| {"input", [](NamedGraph& g, const std::string&){ | |
| return g.base.input(); } }, | |
| {"a+b", [](NamedGraph& g, const std::string&){ | |
| return g.base.function(plus, | |
| g.construct("input.a"), | |
| g.construct("input.b")); } } | |
| }; | |
| NamedGraph g(cb); | |
| printf("index %d\n", g.construct("input.a")); | |
| printf("index %d\n", g.construct("input.b")); | |
| printf("index %d\n", g.construct("a+b")); | |
| try { | |
| printf("index %d\n", g.construct("a")); | |
| } catch(const std::exception& e) { | |
| printf("error: %s\n", e.what()); | |
| } | |
| try { | |
| printf("r = %f\n", g.get("a+b")->asDouble()); | |
| } catch(const std::exception& e) { | |
| printf("error: %s\n", e.what()); | |
| } | |
| auto g2 = NamedGraphWithNewInputs(&g, { | |
| {"input.a", doubleVal(10)}, | |
| {"input.b", doubleVal(20)} | |
| }); | |
| auto g3 = NamedGraphWithNewInputs(&g2, { | |
| {"input.b", doubleVal(10)} | |
| }); | |
| printf("r2 = %f (version = %d)\n", g3.get("a+b")->asDouble(), g3.debug_NodeVersion("a+b")); | |
| testDeadlockDetector(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <atomic> | |
| #include <functional> | |
| #include <utility> | |
| #include <string> | |
| #include <vector> | |
| #include <stdexcept> | |
| #include <map> | |
| #include <bit> | |
| /* Read-optimized linear-probe hashtable. | |
| Reads are lock-free: read the table array pointer from an atomic | |
| variable and perform a lookup. | |
| Inserts must be protected by a mutex. They set atomic value | |
| pointers which can be immediately picked up by readers. | |
| On resize, a new table will be allocated and the table pointer is | |
| changed to it. | |
| Old tables are not deallocated until the desctructor is called to | |
| have no dangling pointers. | |
| */ | |
| template< | |
| typename Key, | |
| typename Value, | |
| typename Hash = std::hash<Key>, | |
| typename KeyEq = std::equal_to<Key> | |
| > | |
| class LockFreeLookupHashTable { | |
| static constexpr size_t INITIAL_SIZE = 16; // must be power of 2 | |
| struct Item { | |
| std::atomic<size_t> hash; | |
| Key key; | |
| Value value; | |
| }; | |
| using Table = std::vector<Item>; | |
| std::atomic<Table*> table; // current pointer, swapped on resize | |
| std::vector<Table*> oldTables; // to free memory | |
| size_t used; | |
| public: | |
| LockFreeLookupHashTable() : table(new Table(INITIAL_SIZE)), used(0) {} | |
| LockFreeLookupHashTable(const std::initializer_list<std::pair<Key,Value>>& items) : LockFreeLookupHashTable() { | |
| unsafeReserve(items.size()); | |
| for (auto& [k,v] : items) | |
| unsafeEmplace(k, [&](){ return v; }); | |
| } | |
| LockFreeLookupHashTable(const LockFreeLookupHashTable&) = delete; | |
| LockFreeLookupHashTable& operator=(const LockFreeLookupHashTable&) = delete; | |
| LockFreeLookupHashTable(LockFreeLookupHashTable&&) = delete; | |
| LockFreeLookupHashTable& operator=(LockFreeLookupHashTable&&) = delete; | |
| ~LockFreeLookupHashTable() { | |
| delete table.load(std::memory_order_acquire); | |
| for (auto t : oldTables) delete t; | |
| } | |
| // lock-free | |
| const Value *lookupPtr(const Key& key) const { | |
| const Table& t = *table.load(std::memory_order_acquire); | |
| size_t size = t.size(); | |
| size_t mask = size - 1; | |
| size_t hash = Hash{}(key); | |
| size_t index = hash & mask; | |
| for (size_t i = 0; i < size; i++) { | |
| const Item& item = t[(index+i) & mask]; | |
| size_t h = item.hash.load(std::memory_order_acquire); | |
| if (h == scaleHash(hash) && KeyEq{}(item.key, key)) | |
| return &item.value; | |
| if (h == 0) | |
| return nullptr; | |
| } | |
| return nullptr; | |
| } | |
| // must be protected by a mutex | |
| void unsafeReserve(size_t s) { | |
| s = std::bit_ceil(s); | |
| Table& t = *table.load(std::memory_order_acquire); | |
| if (s > t.size()) | |
| resize(t, s); | |
| } | |
| static const int LOAD_FACTOR = 3; | |
| // must be protected by a mutex | |
| const Value *unsafeEmplace(const Key& key, const auto& newValue) | |
| { | |
| Table& t = *table.load(std::memory_order_acquire); | |
| const Value *r = emplaceIntoTable(t, key, [&]() { | |
| ++used; | |
| return newValue(); | |
| }); | |
| if (used * LOAD_FACTOR > t.size()) | |
| resize(); | |
| return r; | |
| } | |
| size_t size() const { return used; } | |
| private: | |
| static size_t scaleHash(size_t h) { return h | 1; } | |
| Value *emplaceIntoTable(Table& t, const Key& key, const auto& newValue) { | |
| size_t size = t.size(); | |
| size_t mask = size - 1; | |
| size_t hash = Hash{}(key); | |
| size_t index = hash & mask; | |
| for (size_t i = 0; i < size; i++) { | |
| Item& item = t[(index+i) & mask]; | |
| size_t h = item.hash.load(std::memory_order_acquire); | |
| if (h == scaleHash(hash) && KeyEq{}(item.key, key)) | |
| return &item.value; // return existing | |
| if (!h) { | |
| item.key = key; | |
| item.value = newValue(); | |
| item.hash.store(scaleHash(hash), std::memory_order_release); | |
| return &item.value; | |
| } | |
| } | |
| // shouldn't end up here as we resize the table | |
| throw std::runtime_error("the table is full?"); | |
| } | |
| void resize() { | |
| Table& t = *table.load(std::memory_order_acquire); | |
| resize(t, t.size() * 2); | |
| } | |
| void resize(Table& t, size_t newSize) { | |
| Table& newTable = *new Table(newSize); | |
| for (const auto& item : t) { | |
| size_t h = item.hash.load(std::memory_order_acquire); | |
| if (h) | |
| emplaceIntoTable(newTable, item.key, [&]{ return item.value; }); | |
| } | |
| oldTables.push_back(&t); | |
| table.store(&newTable, std::memory_order_release); | |
| // printf("resized to %lu\n", newTable.size()); | |
| } | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment