Created
June 20, 2015 22:35
-
-
Save inspirit/f044cabb9c5000a554fc to your computer and use it in GitHub Desktop.
Lock-free Stack using Double width CAS and Packed/Tagged pointer (based on Go)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <atomic> | |
#include <type_traits> | |
template<typename T, size_t Capacity> | |
struct lfstack { | |
struct node_t final { | |
T value; | |
node_t *next; | |
}; | |
struct head_t final { | |
uintptr_t aba; | |
node_t *node; | |
head_t() noexcept :aba(0),node(nullptr) | |
{} | |
head_t(node_t* ptr) noexcept :aba(0),node(ptr) | |
{} | |
}; | |
static_assert(sizeof(head_t)== 2*sizeof(uintptr_t), "Stack head should be 2 pointers size."); | |
alignas(128) std::atomic<head_t> head; | |
alignas(128) std::atomic<head_t> free_nodes; | |
using node_type = typename std::aligned_storage<sizeof(node_t), 128>::type; | |
node_type node_buffer[Capacity]; | |
lfstack() | |
{ | |
head.store( head_t(), std::memory_order_relaxed ); | |
for(size_t i = 0; i < Capacity - 1; ++i) | |
{ | |
reinterpret_cast<node_t*>(&node_buffer[i])->next = reinterpret_cast<node_t*>(&node_buffer[i + 1]); | |
} | |
reinterpret_cast<node_t*>(&node_buffer[Capacity-1])->next = nullptr; | |
free_nodes.store(head_t(reinterpret_cast<node_t*>(&node_buffer[0])), std::memory_order_relaxed); | |
// Issue memory barrier so we can immediately start work | |
std::atomic_thread_fence(std::memory_order_release); | |
} | |
template<class U> | |
bool push(U && data) | |
{ | |
node_t *node = _pop(free_nodes); | |
if (node == nullptr) | |
return false; | |
node->value = std::forward<U>(data); | |
_push(head, node); | |
return true; | |
} | |
bool pop(T& data) | |
{ | |
node_t *node = _pop(head); | |
if (node == nullptr) | |
return false; | |
data = std::move(node->value); | |
_push(free_nodes, node); | |
return true; | |
} | |
node_t* _pop(std::atomic<head_t>& h) | |
{ | |
head_t next, orig = h.load(std::memory_order_relaxed); | |
do { | |
if (orig.node == nullptr) | |
return nullptr; | |
next.aba = orig.aba + 1; | |
next.node = orig.node->next; | |
} while (!h.compare_exchange_weak(orig, next, | |
std::memory_order_acq_rel, | |
std::memory_order_acquire)); | |
return orig.node; | |
} | |
void _push(std::atomic<head_t>& h, node_t* node) | |
{ | |
head_t next, orig = h.load(std::memory_order_relaxed); | |
do { | |
node->next = orig.node; | |
next.aba = orig.aba + 1; | |
next.node = node; | |
} while (!h.compare_exchange_weak(orig, next, | |
std::memory_order_acq_rel, | |
std::memory_order_acquire)); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <atomic> | |
#include <type_traits> | |
namespace detail { | |
template<typename T, size_t PSize> struct pointer_pack {}; | |
template<typename T> struct pointer_pack<T,8> | |
{ | |
// 64Bit Machine | |
// On AMD64, virtual addresses are 48-bit numbers sign extended to 64. | |
// We shift the address left 16 to eliminate the sign extended part and make | |
// room in the bottom for the count. | |
// In addition to the 16 bits taken from the top, we can take 3 from the | |
// bottom, because node must be pointer-aligned, giving a total of 19 bits | |
// of count. | |
uint64_t operator() (T *node, uintptr_t cnt) const | |
{ | |
return (uint64_t)((uintptr_t)(node))<<16 | (uint64_t)(cnt&((1<<19)-1)); | |
} | |
T* operator() (uint64_t val) const | |
{ | |
return (T*)(uintptr_t)(int64_t(val) >> 19 << 3); | |
} | |
}; | |
// 32Bit Machine | |
template<typename T> struct pointer_pack<T,4> | |
{ | |
uint64_t operator() (T *node, uintptr_t cnt) const | |
{ | |
return (uint64_t)((uintptr_t)(node))<<32 | (uint64_t)(cnt); | |
} | |
T* operator() (uint64_t val) const | |
{ | |
return (T*)(uintptr_t)(val >> 32); | |
} | |
}; | |
} // detail | |
template<typename T, size_t Capacity> | |
struct lfstack | |
{ | |
struct node_t final | |
{ | |
std::atomic<uint64_t> next; | |
uintptr_t pushcnt; | |
T data; | |
}; | |
using pointer_pack = detail::pointer_pack<node_t,sizeof(uintptr_t)>; | |
using node_type = typename std::aligned_storage<sizeof(node_t), 128>::type; | |
node_type _pool[Capacity]; | |
alignas(128) std::atomic<uint64_t> _head = {0}; | |
alignas(128) std::atomic<uint64_t> _free = {0}; | |
lfstack() | |
{ | |
// push all pool nodes to free stack | |
for(size_t i = 0; i < Capacity; ++i) | |
{ | |
node_t* node = reinterpret_cast<node_t*>(&_pool[i]); | |
node->pushcnt = 0; | |
node->next.store(0, std::memory_order_relaxed); | |
_push(_free, node); | |
} | |
// Issue memory barrier so we can immediately start work | |
std::atomic_thread_fence(std::memory_order_release); | |
} | |
template<class U> | |
bool push(U && data) | |
{ | |
node_t *node = _pop(_free); | |
if (node == nullptr) | |
return false; | |
node->data = std::forward<U>(data); | |
_push(_head, node); | |
return true; | |
} | |
bool pop(T& data) | |
{ | |
node_t *node = _pop(_head); | |
if (node == nullptr) | |
return false; | |
data = std::move(node->data); | |
_push(_free, node); | |
return true; | |
} | |
void _push(std::atomic<uint64_t>& h, node_t* node) | |
{ | |
node->pushcnt++; | |
uint64_t packed = pointer_pack()(node, node->pushcnt); | |
// check | |
// if(pointer_pack()(packed) != node) | |
// throw std::runtime_error("lfstack push invalid packing!"); | |
uint64_t old = h.load(std::memory_order_relaxed); | |
do | |
{ | |
node->next.store(old, std::memory_order_relaxed); | |
} while (!h.compare_exchange_weak(old, packed)); | |
} | |
node_t* _pop(std::atomic<uint64_t>& h) | |
{ | |
uint64_t next, old = h.load(std::memory_order_relaxed); | |
node_t* node; | |
do | |
{ | |
if(old == 0) return nullptr; | |
node = pointer_pack()(old); | |
next = node->next.load(std::memory_order_relaxed); | |
} while (!h.compare_exchange_weak(old, next)); | |
return node; | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <atomic> | |
#include <thread> | |
#include <chrono> | |
#include <vector> | |
#include <array> | |
#include <future> | |
#include <iostream> | |
#include <random> | |
#include "lfstack_tagptr.hpp" | |
// #include "lfstack_dcas.hpp" | |
int main() | |
{ | |
const size_t K = 100; | |
const size_t P = 4 * std::thread::hardware_concurrency(); | |
const size_t N = 100000/1; | |
// Create 2 stacks. | |
lfstack<size_t,K> stacks[2]; | |
std::cout << "Push "<<K<<" elements randomly onto the stacks.\n"; | |
size_t sum = 0; | |
for(size_t i = 0; i < K; ++i) | |
{ | |
sum += i; | |
stacks[i%2].push(i); | |
} | |
std::vector<double> timings(P); | |
std::vector<std::future<void>> dfuts; | |
std::atomic<int> cdl(P); | |
for (size_t i = 0; i < P; ++i) | |
dfuts.push_back(std::async(std::launch::async, [&, i]() { | |
--cdl; | |
while (cdl.load()) | |
std::this_thread::sleep_for(std::chrono::milliseconds(5)); | |
std::minstd_rand rng; | |
rng.seed(i); | |
std::uniform_int_distribution<size_t> distr(0, N); | |
auto start_time = std::chrono::high_resolution_clock::now(); | |
// Pop a node from a random stack, then push it onto a random stack. | |
for(size_t it = 0; it < N; ++it) | |
{ | |
size_t val; | |
if(stacks[ distr(rng)&1 ].pop(val)) | |
{ | |
stacks[ distr(rng)&1 ].push(val); | |
} | |
} | |
auto end_time = std::chrono::high_resolution_clock::now(); | |
std::chrono::duration<double> dur = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time); | |
timings[i] = dur.count(); | |
})); | |
for (auto& fut : dfuts) | |
fut.get(); | |
std::cout << "Pop all elements from both stacks, and verify that nothing lost.\n"; | |
size_t sum2 = 0; | |
size_t cnt = 0; | |
for(size_t i = 0; i < 2; ++i) | |
{ | |
size_t val; | |
size_t pp = 0; | |
while(stacks[i].pop(val)) | |
{ | |
cnt++; | |
sum2 += val; | |
} | |
} | |
std::cout << "number of nodes " << cnt << "/" << K << "\n"; | |
std::cout << "sum " << sum2 << "/" << sum << "\n"; | |
double mean_time = 0; | |
for (size_t i = 0; i < P; ++i) | |
mean_time += timings[i]; | |
std::cout << "mean time: " << (mean_time/P*1000) << "ms\n"; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
on my 4 core i7 MacBook i have:
DCAS stack: ~370ms
TagPtr stack: ~260ms