Created
May 29, 2017 02:02
-
-
Save glampert/2c462bcc77d326526787708c0f2cceff to your computer and use it in GitHub Desktop.
Fixed-size concurrent/lockless hash table for integer types using C++11 atomics.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// License: | |
// Public Domain. Feel free to copy, distribute, and modify this file as you see fit. | |
#include <cassert> | |
#include <cstddef> | |
#include <cstdint> | |
#include <cstdio> | |
#include <array> | |
#include <atomic> | |
#include <thread> // for the tests | |
// ---------------------------------------------------------------------------- | |
// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp | |
struct MurmurHash3FMix32 | |
{ | |
std::size_t operator()(std::uint32_t k) const | |
{ | |
k ^= k >> 16; | |
k *= 0x85ebca6b; | |
k ^= k >> 13; | |
k *= 0xc2b2ae35; | |
k ^= k >> 16; | |
return std::size_t(k); | |
} | |
}; | |
struct MurmurHash3FMix64 | |
{ | |
std::size_t operator()(std::uint64_t k) const | |
{ | |
k ^= k >> 33; | |
k *= UINT64_C(0xff51afd7ed558ccd); | |
k ^= k >> 33; | |
k *= UINT64_C(0xc4ceb9fe1a85ec53); | |
k ^= k >> 33; | |
return std::size_t(k); | |
} | |
}; | |
// ---------------------------------------------------------------------------- | |
// http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table/ | |
// Hash table optimized for integer types. Uses Standard C++ atomics | |
// to ensure concurrent insertion, removal and lookup are possible | |
// without the need to acquire locks. Uses Open Addressing with | |
// Linear Probing to resolve key collisions. A special sentinel key | |
// must be reserved to denote unused keys. If using an integer | |
// type for the key it defaults to zero but any constant can | |
// be specified as the last template argument. Table size is | |
// fixed at compile time and does not grow. | |
// | |
// Based on "The World's Simplest Lock-Free Hash Table", by Jeff Preshing. | |
template | |
< | |
typename Key, | |
typename Value, | |
typename HashFunc, | |
const std::size_t TableSize, | |
const Key InvalidKeyConstant = Key{} | |
> | |
class LocklessHashTable final | |
{ | |
public: | |
static_assert((TableSize & (TableSize - 1)) == 0, "Table size must be a power of two!"); | |
struct Pair | |
{ | |
std::atomic<Key> key; | |
std::atomic<Value> value; | |
Pair() : key{ InvalidKeyConstant }, value{} | |
{ } | |
bool isValid() const | |
{ | |
return key.load(std::memory_order_relaxed) != InvalidKeyConstant; | |
} | |
}; | |
using Buckets = std::array<Pair, TableSize>; | |
public: | |
LocklessHashTable() : m_usedCount{ 0 } | |
{ } | |
bool insert(const Key key, Value value) | |
{ | |
assert(key != InvalidKeyConstant); | |
int bucketsProbed = 0; | |
for (std::size_t bucket = HashFunc{}(key);; ++bucket) | |
{ | |
bucket &= (TableSize - 1); | |
if (bucketsProbed++ == TableSize) | |
{ | |
return false; // Table is full! | |
} | |
// Load the key that was there. | |
const Key probedKey = m_buckets[bucket].key.load(std::memory_order_relaxed); | |
if (probedKey != key) // The entry was either free, or contains another key. | |
{ | |
if (probedKey != InvalidKeyConstant) | |
{ | |
continue; // Usually, it contains another key. Keep probing. | |
} | |
// The bucket was free. Now let's try to take it using a CAS. | |
Key expectedKey = InvalidKeyConstant; | |
const bool success = m_buckets[bucket].key.compare_exchange_weak(expectedKey, key, | |
std::memory_order_release, std::memory_order_relaxed); | |
if (!success && (expectedKey != InvalidKeyConstant) && (expectedKey != key)) | |
{ | |
continue; // Another thread just stole it. | |
} | |
// Either we just added the key, or another thread did. | |
// In which case we can overwrite the value but assume the | |
// other thread has bumped the count if we observed the same key. | |
if (expectedKey != key) | |
{ | |
++m_usedCount; | |
assert(m_usedCount <= TableSize); | |
} | |
} | |
m_buckets[bucket].value.store(value, std::memory_order_relaxed); | |
return true; // Value inserted or overwritten. | |
} | |
} | |
bool remove(const Key key) | |
{ | |
assert(key != InvalidKeyConstant); | |
int bucketsProbed = 0; | |
for (std::size_t bucket = HashFunc{}(key);; ++bucket) | |
{ | |
bucket &= (TableSize - 1); | |
if (bucketsProbed++ == TableSize) | |
{ | |
return false; // Probed the whole table without finding the key. | |
} | |
Key probedKey = m_buckets[bucket].key.load(std::memory_order_relaxed); | |
if (probedKey == key) | |
{ | |
if (!m_buckets[bucket].key.compare_exchange_strong(probedKey, InvalidKeyConstant, | |
std::memory_order_release, std::memory_order_relaxed)) | |
{ | |
return false; // Another thread has already removed the key and reused the bucket. | |
} | |
// Try to clear the value of the removed key: | |
Value oldValue = m_buckets[bucket].value.load(std::memory_order_relaxed); | |
if (oldValue != Value{}) | |
{ | |
m_buckets[bucket].value.compare_exchange_weak(oldValue, Value{}, | |
std::memory_order_release, std::memory_order_relaxed); | |
} | |
--m_usedCount; | |
assert(m_usedCount >= 0); | |
return true; | |
} | |
} | |
} | |
bool access(const Key key, Value * outValue = nullptr) const | |
{ | |
assert(key != InvalidKeyConstant); | |
int bucketsProbed = 0; | |
for (std::size_t bucket = HashFunc{}(key);; ++bucket) | |
{ | |
bucket &= (TableSize - 1); | |
if (bucketsProbed++ == TableSize) | |
{ | |
return false; // Probed the whole table without finding the key. | |
} | |
const Key probedKey = m_buckets[bucket].key.load(std::memory_order_relaxed); | |
if (probedKey == key) | |
{ | |
if (outValue != nullptr) | |
{ | |
*outValue = m_buckets[bucket].value.load(std::memory_order_relaxed); | |
} | |
return true; | |
} | |
if (probedKey == InvalidKeyConstant) | |
{ | |
return false; | |
} | |
} | |
} | |
void clear() // This operation is not thread safe! | |
{ | |
m_usedCount.store(0, std::memory_order_relaxed); | |
for (Pair & p : m_buckets) | |
{ | |
p.key.store(InvalidKeyConstant, std::memory_order_relaxed); | |
p.value.store(Value{}, std::memory_order_relaxed); | |
} | |
} | |
int size() const | |
{ | |
return m_usedCount.load(std::memory_order_relaxed); | |
} | |
bool isEmpty() const | |
{ | |
return size() == 0; | |
} | |
const Buckets & buckets() const | |
{ | |
return m_buckets; | |
} | |
private: | |
std::atomic<int> m_usedCount; | |
Buckets m_buckets; | |
}; | |
// ---------------------------------------------------------------------------- | |
static void tests() | |
{ | |
// Single threaded usage: | |
{ | |
LocklessHashTable<std::uint32_t, std::uint32_t, MurmurHash3FMix32, 16> ht; | |
ht.insert(1, 42); | |
ht.insert(2, 1337); | |
std::uint32_t one, two; | |
ht.access(1, &one); | |
ht.access(2, &two); | |
assert(one == 42); | |
assert(two == 1337); | |
assert(ht.size() == 2); | |
assert(ht.remove(1) == true); | |
assert(ht.remove(2) == true); | |
assert(ht.remove(76) == false); | |
assert(ht.size() == 0); | |
} | |
// Concurrent insertions: | |
{ | |
LocklessHashTable<std::uint64_t, std::uint64_t, MurmurHash3FMix64, 1024> ht; | |
unsigned N1 = 0; | |
std::thread inserterThread1{ | |
[&ht, &N1]() { | |
for(unsigned i = 1; i < 512; ++i) | |
{ | |
ht.insert(i, i); | |
N1 += i; | |
if (i & 1) std::this_thread::yield(); | |
} | |
} | |
}; | |
unsigned N2 = 0; | |
std::thread inserterThread2{ | |
[&ht, &N2]() { | |
for(unsigned i = 512; i <= 1024; ++i) | |
{ | |
ht.insert(i, i); | |
N2 += i; | |
if (i & 1) std::this_thread::yield(); | |
} | |
} | |
}; | |
inserterThread1.join(); | |
inserterThread2.join(); | |
assert(ht.size() == 1024); | |
std::uint64_t accK = 0, accV = 0; | |
for (const auto & b : ht.buckets()) | |
{ | |
auto k = b.key.load(std::memory_order_relaxed); | |
auto v = b.value.load(std::memory_order_relaxed); | |
assert(k >= 1 && k <= 1024); | |
assert(v >= 1 && v <= 1024); | |
accK += k; | |
accV += v; | |
} | |
assert(accK == (N1 + N2)); | |
assert(accV == (N1 + N2)); | |
} | |
// One thread inserting the other accessing: | |
{ | |
LocklessHashTable<std::uint32_t, std::uint64_t, MurmurHash3FMix32, 1024> ht; | |
std::thread inserterThread{ | |
[&ht]() { | |
for (unsigned i = 1; i <= 512; ++i) | |
{ | |
ht.insert(i, i); | |
if (i & 1) std::this_thread::yield(); | |
} | |
} | |
}; | |
std::thread accessingThread{ | |
[&ht]() { | |
constexpr unsigned expectedCount = 131328; // sum(1,512) | |
std::uint64_t count = 0; | |
std::array<bool, 512> keysCounted = {}; | |
while (count < expectedCount) | |
{ | |
for (unsigned i = 1; i <= 512; ++i) | |
{ | |
if (!keysCounted[i - 1]) | |
{ | |
std::uint64_t v = 0; | |
const bool gotKey = ht.access(i, &v); | |
assert(v == 0 || (v >= 1 && v <= 512)); | |
if (gotKey && v != 0) | |
{ | |
keysCounted[i - 1] = true; | |
count += v; | |
} | |
} | |
} | |
} | |
assert(count == expectedCount); | |
} | |
}; | |
inserterThread.join(); | |
accessingThread.join(); | |
assert(ht.size() == 512); | |
} | |
// Concurrent inserts + removes: | |
{ | |
const std::array<int, 10> items = {{ 42, 33, 66, 123, 678, 255, 1024, 1, 9, 1337 }}; // to be inserted/removed | |
LocklessHashTable<int, int, MurmurHash3FMix32, 32> ht; | |
std::thread inserterThread{ | |
[&ht, &items]() { | |
for (int i : items) | |
{ | |
ht.insert(i, i); | |
if (i & 1) std::this_thread::yield(); | |
} | |
} | |
}; | |
std::thread removerThread{ | |
[&ht, &items]() { | |
unsigned numRemoved = 0; | |
while (numRemoved < items.size()) | |
{ | |
for (int i : items) | |
{ | |
if (ht.remove(i)) numRemoved++; | |
} | |
} | |
assert(numRemoved == items.size()); | |
assert(ht.size() == 0); | |
} | |
}; | |
inserterThread.join(); | |
removerThread.join(); | |
assert(ht.isEmpty()); | |
} | |
} | |
// ---------------------------------------------------------------------------- | |
int main() | |
{ | |
// Run several test iterations to exercise thread scheduling differences. | |
for (int iterations = 0; iterations < 512; ++iterations) | |
{ | |
tests(); | |
} | |
std::printf("LocklessHashTable tests completed successfully.\n"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment