|
// C++ MCAS implementation, based on "Efficient Multi-word Compare and Swap" |
|
// (https://arxiv.org/pdf/2008.02527.pdf) |
|
// |
|
// TODO: API header |
|
// TODO: reclamation. Currently the implementation leaks descriptor records |
|
// and leaves them in place indefinitely. Short list of options: |
|
// 1) reuse thread-local descriptors (https://arxiv.org/pdf/1708.01797.pdf) |
|
// 2) use "record manager" abstraction and plugins (DEBRA, etc.) |
|
// (https://bitbucket.org/trbot86/implementations/src/master/cpp/debra) |
|
// TODO: generalization: 32-bit support, user-defined word size and |
|
// descriptor marker |
|
// TODO: support < c++17 |
|
// TODO: wrapping type for words used in MCAS, extending `atomic`. Especially |
|
// for preventing accidental raw reads. |
|
|
|
#include <cstddef> |
|
#include <cstdint> |
|
#include <utility> |
|
#include <atomic> |
|
#include <thread> |
|
#include <cstdlib> |
|
#include <memory> |
|
#include <vector> |
|
|
|
struct McasDescriptor; |
|
bool mcas(McasDescriptor* desc); |
|
|
|
struct WordDescriptor { |
|
std::atomic<uintptr_t>* address; |
|
uintptr_t old; |
|
uintptr_t new_; |
|
McasDescriptor* parent; |
|
|
|
static_assert(std::pointer_traits<decltype(address)>::element_type::is_always_lock_free); |
|
}; |
|
|
|
enum StatusType { ACTIVE, SUCCESSFUL, FAILED }; |
|
|
|
struct McasDescriptor { |
|
std::atomic<StatusType> status; |
|
// NOTE: It's suggested to order the word descriptors by address. Then, |
|
// should two threads try to mutate the same words, one will always |
|
// succeed, potentially avoiding repeated collisions. |
|
std::vector<WordDescriptor> words; |
|
|
|
static_assert(decltype(status)::is_always_lock_free); |
|
}; |
|
|
|
// for placing descriptor sentinel in unused address bits-- but see comments below |
|
const uintptr_t DESCRIPTOR_MASK = 0xFFFF000000000000; |
|
const uintptr_t DESCRIPTOR_SENTINEL = 0xDCBA000000000000; |
|
|
|
// (The approach of in-band descriptors seems naive. Assuming CAS locations |
|
// can contain non-pointer values, even if a value exactly matches an active |
|
// descriptor, we don't know it's not by chance. While mcas() can verify that |
|
// expected and new values don't look like a descriptor, coordination between |
|
// this library and the application is necessary to avoid conflicts.) |
|
inline bool isDescriptorValue(uintptr_t val) { |
|
return (val & DESCRIPTOR_MASK) == DESCRIPTOR_SENTINEL; |
|
} |
|
|
|
inline uintptr_t descriptorPtrToValue(const WordDescriptor* ptr) { |
|
auto value = reinterpret_cast<uintptr_t>(ptr); |
|
assert(!(value & DESCRIPTOR_MASK)); |
|
return value | DESCRIPTOR_SENTINEL; |
|
} |
|
|
|
inline WordDescriptor* descriptorValueToPtr(uintptr_t value) { |
|
return reinterpret_cast<WordDescriptor*>(value & ~DESCRIPTOR_MASK); |
|
} |
|
|
|
std::pair<uintptr_t, uintptr_t> |
|
readInternal(const std::atomic<uintptr_t>* addr, McasDescriptor* self) { |
|
while (true) { |
|
auto content = addr->load(); |
|
auto value = content; |
|
if (isDescriptorValue(content)) { |
|
auto desc = descriptorValueToPtr(content); |
|
auto parent = desc->parent; |
|
if (parent != self && parent->status == ACTIVE) { |
|
mcas(parent); |
|
continue; |
|
} |
|
value = parent->status == SUCCESSFUL ? desc->new_ : desc->old; |
|
} |
|
return std::make_pair(content, value); |
|
} |
|
} |
|
|
|
uintptr_t read(const std::atomic<uintptr_t>& addr) { |
|
return readInternal(&addr, nullptr).second; |
|
} |
|
|
|
bool mcas(McasDescriptor* desc) { |
|
auto status = SUCCESSFUL; |
|
for (const auto& wordDesc : desc->words) { |
|
assert(!isDescriptorValue(wordDesc.old)); |
|
assert(!isDescriptorValue(wordDesc.new_)); |
|
auto desired_desc = descriptorPtrToValue(&wordDesc); |
|
retry_word: |
|
auto result = readInternal(wordDesc.address, desc); |
|
auto current_desc = result.first; |
|
// if this word already points to the right place, move on |
|
if (current_desc == desired_desc) continue; |
|
// if the expected value is different, the MCAS fails |
|
if (result.second != wordDesc.old) { |
|
status = FAILED; |
|
break; |
|
} |
|
if (desc->status != ACTIVE) break; |
|
// try to install the pointer to my descriptor; if failed, retry |
|
if (!wordDesc.address->compare_exchange_weak(current_desc, desired_desc)) { |
|
goto retry_word; |
|
} |
|
} |
|
auto expected_status = ACTIVE; |
|
if (desc->status.compare_exchange_strong(expected_status, status)) { |
|
// if I finalized this descriptor, mark it for reclamation |
|
//retireForCleanup(desc); |
|
} |
|
return (desc->status == SUCCESSFUL); |
|
} |
|
|
|
bool cas2(std::atomic<uintptr_t>* a_addr, uintptr_t a_old, uintptr_t a_new, |
|
std::atomic<uintptr_t>* b_addr, uintptr_t b_old, uintptr_t b_new) { |
|
// TODO: sort entries by address |
|
auto desc = new McasDescriptor { |
|
.status = ACTIVE, |
|
.words = { |
|
{ |
|
.address = a_addr, |
|
.old = a_old, |
|
.new_ = a_new, |
|
}, |
|
{ |
|
.address = b_addr, |
|
.old = b_old, |
|
.new_ = b_new, |
|
}, |
|
}, |
|
}; |
|
for (auto& wordDesc : desc->words) { |
|
wordDesc.parent = desc; |
|
} |
|
return mcas(desc); |
|
} |
|
|
|
// |
|
// Simple tests and benchmarks follow |
|
// |
|
|
|
#include <iostream> |
|
#include <chrono> |
|
|
|
using namespace std::chrono; |
|
|
|
void test_mcas() { |
|
printf("test_mcas\n"); |
|
std::atomic<uintptr_t> arr[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; |
|
assert(cas2(&arr[2], 2, 20, &arr[6], 6, 60)); |
|
assert(read(arr[0]) == 0); |
|
assert(read(arr[2]) == 20); |
|
assert(read(arr[6]) == 60); |
|
|
|
assert(cas2(&arr[2], 20, 30, &arr[6], 60, 70)); |
|
assert(read(arr[0]) == 0); |
|
assert(read(arr[2]) == 30); |
|
assert(read(arr[6]) == 70); |
|
} |
|
|
|
void test_mcas_identity() { |
|
printf("test_mcas_identity\n"); |
|
std::atomic<uintptr_t> a(5); |
|
std::atomic<uintptr_t> b(10); |
|
assert(cas2(&a, 5, 7, &b, 10, 10)); |
|
assert(read(a) == 7); |
|
assert(read(b) == 10); |
|
} |
|
|
|
// for n attempts, increment 2 positions in the array atomically |
|
void _mcas_list_do(int n, int inc, std::vector<std::atomic<uintptr_t>>* arr_, |
|
std::atomic<int>* success_count) { |
|
auto& arr = *arr_; |
|
auto size = arr.size(); |
|
assert(size % 2 == 0); |
|
auto half = size / 2; |
|
int local_success_count = 0; |
|
for (int i = 0; i < n; ++i) { |
|
int j = std::rand() % half; |
|
int k = half + std::rand() % half; |
|
auto v1 = read(arr[j]); |
|
auto v2 = read(arr[k]); |
|
local_success_count += cas2(&arr[j], v1, v1 + inc, &arr[k], v2, v2 + inc); |
|
} |
|
*success_count += local_success_count; |
|
} |
|
|
|
void benchmark_mcas_list(int array_size) { |
|
// parallel threads incrementing list elements |
|
int N_OPS = 10000000; |
|
int N_THREADS = std::thread::hardware_concurrency(); |
|
int N_OPS_PER_THREAD = N_OPS / N_THREADS; |
|
int INC = 3; |
|
|
|
printf("benchmark_mcas_list (array_size=%d, threads=%d)\n", array_size, N_THREADS); |
|
std::vector<std::atomic<uintptr_t>> arr(array_size); |
|
std::vector<std::thread> threads; |
|
std::atomic<int> success_count = 0; |
|
auto t0 = high_resolution_clock::now(); |
|
for (int i = 0; i < N_THREADS; ++i) { |
|
threads.push_back( |
|
std::thread(_mcas_list_do, N_OPS_PER_THREAD, INC, &arr, &success_count)); |
|
} |
|
for (auto &th : threads) { |
|
th.join(); |
|
} |
|
auto duration = duration_cast<microseconds>(high_resolution_clock::now() - t0); |
|
|
|
int sum = 0; |
|
for (const auto& v : arr) { |
|
sum += read(v); |
|
} |
|
assert(sum == 2 * INC * success_count); |
|
|
|
printf(" successful ops: %d (%.1f%%)\n", success_count.load(), |
|
100.0 * success_count / (N_THREADS * N_OPS_PER_THREAD)); |
|
printf(" duration (real): %.3f s\n", duration.count() * 1e-6); |
|
printf(" ops rate: %.2f M/s\n", success_count / (duration.count() * 1e-6) / 1e6); |
|
} |
|
|
|
int main() { |
|
test_mcas(); |
|
test_mcas_identity(); |
|
benchmark_mcas_list(10 /*array_size*/); |
|
benchmark_mcas_list(100); |
|
} |