Created
March 20, 2017 17:52
Atomic/lockless linked list using 128-bits Compare And Swap to solve the A-B-A problem.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// -------------------------------------------------------------------------------------- | |
// Atomic singly-linked intrusive list using 128-bits Compare And Swap (AKA: DCAS). | |
// Keeps a version counter with the list head to prevent the A-B-A problem. | |
// | |
// Based on the implementation found in moodycamel.com: | |
// http://moodycamel.com/blog/2014/solving-the-aba-problem-for-lock-free-free-lists | |
// | |
// My implementation uses raw GCC/Clang atomics intrinsics. While in theory | |
// std::atomic of a struct of exactly 16 bytes and properly aligned could | |
// use 128-bits DCAS, this is not guaranteed. It might very well be implemented | |
// using a spinlock, which would be less efficient. Relying on the compiler | |
// intrinsics makes this code non-portable, but we ensure it will be using the | |
// optimal HW instructions. On Windows/MSVC, we could use _InterlockedCompareExchange128. | |
// | |
// I consider this code sample to be Public Domain, no copyrights claimed. | |
// | |
// NOTE: | |
// To make sure GCC and Clang emit a 'lock cmpxchg16b', you might | |
// have to add the '-mcx16' flag to the command line, as explained in: | |
// http://stackoverflow.com/a/18433055/1198654 | |
// | |
// Tested with: | |
// c++ -std=c++11 -fno-rtti -fno-exceptions -Wall -Wextra -O3 -mcx16 | |
// | |
// ASM listing on Clang with Intel syntax: | |
// -S -mllvm --x86-asm-syntax=intel | |
// -------------------------------------------------------------------------------------- | |
#include <cassert> | |
#include <cstdint> | |
#include <cstdio> | |
#include <thread> | |
#include <atomic> | |
// GCC seems to have second thoughts about inlining the list push/pop functions, even at -O3... | |
#define ALWAYS_INLINE __attribute__((__always_inline__)) | |
// ------------------------------------------------------------------ | |
// CAS 128-bits: | |
// ------------------------------------------------------------------ | |
using int128 = __int128; | |
ALWAYS_INLINE int toBuiltInMemOrder(std::memory_order order) | |
{ | |
switch (order) | |
{ | |
case std::memory_order_relaxed : return __ATOMIC_RELAXED; | |
case std::memory_order_consume : return __ATOMIC_CONSUME; | |
case std::memory_order_acquire : return __ATOMIC_ACQUIRE; | |
case std::memory_order_release : return __ATOMIC_RELEASE; | |
case std::memory_order_acq_rel : return __ATOMIC_ACQ_REL; | |
case std::memory_order_seq_cst : return __ATOMIC_SEQ_CST; | |
default : assert(false); return 0; | |
} // switch (order) | |
} | |
ALWAYS_INLINE bool CAS128(volatile int128 * destination, | |
int128 * expected, | |
int128 desired, | |
std::memory_order successMemOrder, | |
std::memory_order failureMemOrder) | |
{ | |
const bool weak = true; | |
return __atomic_compare_exchange_n(destination, expected, desired, weak, | |
toBuiltInMemOrder(successMemOrder), | |
toBuiltInMemOrder(failureMemOrder)); | |
} | |
// ------------------------------------------------------------------ | |
// AtomicSListNode: | |
// ------------------------------------------------------------------ | |
template<typename T> | |
struct AtomicSListNode | |
{ | |
T * m_next = nullptr; | |
}; | |
// ------------------------------------------------------------------ | |
// AtomicSList128: | |
// ------------------------------------------------------------------ | |
// Atomic and A-B-A safe. | |
// Head pointer is paired with a version count that | |
// is incremented whenever we try to change it. | |
template<typename T> | |
class AtomicSList128 final | |
{ | |
public: | |
AtomicSList128() | |
{ | |
m_head.m_packed = 0; | |
} | |
ALWAYS_INLINE void push(T * node) | |
{ | |
VersionedPtr newHead; | |
VersionedPtr currHead; | |
newHead.m_pointer = node; | |
currHead.m_packed = m_head.m_packed; | |
do { | |
newHead.m_version = currHead.m_version + 1; | |
node->m_next = currHead.m_pointer; | |
} while (!CAS128(&m_head.m_packed, &currHead.m_packed, newHead.m_packed, | |
std::memory_order_release, std::memory_order_relaxed)); | |
} | |
ALWAYS_INLINE T * pop() | |
{ | |
VersionedPtr newHead; | |
VersionedPtr currHead; | |
currHead.m_packed = m_head.m_packed; | |
while (currHead.m_pointer != nullptr) | |
{ | |
newHead.m_pointer = currHead.m_pointer->m_next; | |
newHead.m_version = currHead.m_version + 1; | |
if (CAS128(&m_head.m_packed, &currHead.m_packed, newHead.m_packed, | |
std::memory_order_release, std::memory_order_acquire)) | |
{ | |
break; | |
} | |
} | |
return currHead.m_pointer; | |
} | |
// These are non-atomic. | |
ALWAYS_INLINE T * first() const | |
{ | |
return m_head.m_pointer; | |
} | |
ALWAYS_INLINE bool isEmpty() const | |
{ | |
return first() == nullptr; | |
} | |
// Syntactic sugar to iterate the list. | |
ALWAYS_INLINE static const T * next(const T * curr) { return curr->m_next; } | |
ALWAYS_INLINE static T * next( T * curr) { return curr->m_next; } | |
// Not copyable. | |
AtomicSList128(const AtomicSList128 &) = delete; | |
AtomicSList128 & operator = (const AtomicSList128 &) = delete; | |
private: | |
union alignas(16) VersionedPtr | |
{ | |
struct | |
{ | |
T * m_pointer; | |
std::uint64_t m_version; | |
}; | |
int128 m_packed; | |
}; | |
static_assert(sizeof(VersionedPtr) == 16, "Bad size!"); | |
static_assert(alignof(VersionedPtr) == 16, "Bad alignment!"); | |
VersionedPtr m_head; | |
}; | |
// ------------------------------------------------------------------ | |
// AtomicSList64: | |
// ------------------------------------------------------------------ | |
// Atomic, but *not* A-B-A safe, for comparison with the above. | |
// We can concurrently push and pop, but only as long | |
// as the incoming pointer is guaranteed to be unique. | |
// Still covers a lot of usage cases though, such as | |
// concurrently building up a list that will later be | |
// consumed by a single thread. | |
template<typename T> | |
class AtomicSList64 final | |
{ | |
public: | |
AtomicSList64() | |
: m_head{ nullptr } | |
{ } | |
ALWAYS_INLINE void push(T * node) | |
{ | |
T * currHead = m_head.load(std::memory_order_relaxed); | |
do { | |
node->m_next = currHead; | |
} while (!m_head.compare_exchange_weak(currHead, node, | |
std::memory_order_release, | |
std::memory_order_relaxed)); | |
} | |
ALWAYS_INLINE T * pop() | |
{ | |
T * currHead = m_head.load(std::memory_order_relaxed); | |
while (currHead != nullptr) | |
{ | |
if (m_head.compare_exchange_weak(currHead, currHead->m_next, | |
std::memory_order_release, | |
std::memory_order_acquire)) | |
{ | |
break; | |
} | |
} | |
return currHead; | |
} | |
// These are non-atomic. | |
ALWAYS_INLINE T * first() const | |
{ | |
return m_head.load(std::memory_order_relaxed); | |
} | |
ALWAYS_INLINE bool isEmpty() const | |
{ | |
return first() == nullptr; | |
} | |
// Syntactic sugar to iterate the list. | |
ALWAYS_INLINE static const T * next(const T * curr) { return curr->m_next; } | |
ALWAYS_INLINE static T * next( T * curr) { return curr->m_next; } | |
// Not copyable. | |
AtomicSList64(const AtomicSList64 &) = delete; | |
AtomicSList64 & operator = (const AtomicSList64 &) = delete; | |
private: | |
std::atomic<T *> m_head; | |
}; | |
// ------------------------------------------------------------------ | |
// Tests: | |
// ------------------------------------------------------------------ | |
struct Test : AtomicSListNode<Test> | |
{ | |
int m_value; | |
Test() = default; | |
Test(int val) : m_value{ val } | |
{ } | |
}; | |
template<typename ListType> | |
static void testListSingleThread() | |
{ | |
std::printf("\n>>> %s()\n", __func__); | |
std::printf("----------------------\n"); | |
ListType list; | |
Test t0{0}; | |
Test t1{1}; | |
Test t2{2}; | |
Test t3{3}; | |
Test t4{4}; | |
list.push(&t0); | |
list.push(&t1); | |
list.push(&t2); | |
list.push(&t3); | |
list.push(&t4); | |
for (const Test * curr = list.first(); curr; curr = list.next(curr)) | |
{ | |
std::printf("curr=%p, next=%p, value=%i\n", curr, list.next(curr), curr->m_value); | |
} | |
std::printf("----------------------\n"); | |
const Test * p4 = list.pop(); | |
const Test * p3 = list.pop(); | |
const Test * p2 = list.pop(); | |
assert(p4 == &t4); | |
assert(p3 == &t3); | |
assert(p2 == &t2); | |
assert(!list.isEmpty()); | |
for (const Test * curr = list.first(); curr; curr = list.next(curr)) | |
{ | |
std::printf("curr=%p, next=%p, value=%i\n", curr, list.next(curr), curr->m_value); | |
} | |
const Test * p1 = list.pop(); | |
const Test * p0 = list.pop(); | |
assert(p1 == &t1); | |
assert(p0 == &t0); | |
assert(list.isEmpty()); | |
assert(list.pop() == nullptr); | |
std::printf("----------------------\n"); | |
} | |
template<typename ListType> | |
static void workOnTheList(ListType * pList) | |
{ | |
assert(pList != nullptr); | |
constexpr int NumToPushPop = 256; | |
Test * items[NumToPushPop]; | |
for (int i = 0; i < NumToPushPop; ++i) | |
{ | |
items[i] = new Test{ i }; // We'll intentionally leak this memory for now... | |
} | |
for (int i = 0; i < NumToPushPop; ++i) | |
{ | |
pList->push(items[i]); | |
} | |
assert(!pList->isEmpty()); | |
for (int i = 0; i < NumToPushPop; ++i) | |
{ | |
const Test * item = pList->pop(); | |
assert(item != nullptr); | |
assert(item->m_value >= 0 && item->m_value < NumToPushPop); | |
} | |
std::printf("Thread %#zx OK\n", std::hash<std::thread::id>{}(std::this_thread::get_id())); | |
} | |
template<typename ListType> | |
static void testListWithThreads() | |
{ | |
std::printf("\n>>> %s()\n", __func__); | |
std::printf("----------------------\n"); | |
ListType theList; | |
// Multiple threads randomly pushing and popping to the list: | |
{ | |
std::printf("Testing multiple threads operating on list ...\n"); | |
constexpr int NumThreads = 8; | |
std::thread threads[NumThreads]; | |
for (int t = 0; t < NumThreads; ++t) | |
{ | |
threads[t] = std::thread(&workOnTheList<ListType>, &theList); | |
} | |
for (int t = 0; t < NumThreads; ++t) | |
{ | |
std::printf("Joining thread #%i ...\n", t); | |
threads[t].join(); | |
} | |
assert(theList.isEmpty()); | |
} | |
// Producer consumer style of operation, | |
// one thread pushes, the other pops: | |
{ | |
std::printf("Testing producer/consumer threads ...\n"); | |
std::atomic<bool> stopProducing{ false }; | |
std::atomic<bool> stopConsuming{ false }; | |
std::thread producer{ | |
[&theList, &stopProducing, &stopConsuming]() { | |
int next = 0; | |
while (!stopProducing) | |
{ | |
Test * item = new Test{ next++ }; | |
std::printf("[Producer]: Pushing item %p {%i}\n", item, item->m_value); | |
theList.push(item); | |
} | |
stopConsuming = true; | |
} | |
}; | |
std::thread consumer{ | |
[&theList, &stopConsuming]() { | |
while (!stopConsuming || !theList.isEmpty()) | |
{ | |
while (const Test * item = theList.pop()) | |
{ | |
std::printf("[Consumer]: Popping item %p {%i}\n", item, item->m_value); | |
delete item; | |
} | |
} | |
} | |
}; | |
constexpr std::size_t count = 100000; | |
std::printf("Main thread counting to %zu ...\n", count); | |
for (std::size_t i = 0; i < count; ++i) | |
{ | |
std::this_thread::yield(); | |
} | |
stopProducing = true; | |
producer.join(); | |
consumer.join(); | |
assert(theList.isEmpty()); | |
} | |
std::printf("Main thread OK\n"); | |
std::printf("----------------------\n"); | |
} | |
// ------------------------------------------------------------------ | |
// main(): | |
// ------------------------------------------------------------------ | |
int main() | |
{ | |
using AtomicListSimple = AtomicSList64<Test>; | |
using AtomicListABASafe = AtomicSList128<Test>; | |
testListSingleThread<AtomicListSimple>(); | |
testListSingleThread<AtomicListABASafe>(); | |
testListWithThreads<AtomicListSimple>(); | |
testListWithThreads<AtomicListABASafe>(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment