// C++ MCAS implementation, based on "Efficient Multi-word Compare and Swap" |
// (https://arxiv.org/pdf/2008.02527.pdf) |
// |
// TODO: API header |
// TODO: reclamation. Currently the implementation leaks descriptor records |
// and leaves them in place indefinitely. Short list of options: |
// 1) reuse thread-local descriptors (https://arxiv.org/pdf/1708.01797.pdf) |
// 2) use "record manager" abstraction and plugins (DEBRA, etc.) |
// (https://bitbucket.org/trbot86/implementations/src/master/cpp/debra) |
// TODO: generalization: 32-bit support, user-defined word size and |
// descriptor marker |
// TODO: support < c++17 |
// TODO: wrapping type for words used in MCAS, extending `atomic`. Especially |
// for preventing accidental raw reads. |
#include <cstddef> |
#include <cstdint> |
#include <utility> |
#include <atomic> |
#include <thread> |
#include <cstdlib> |
#include <memory> |
#include <vector> |
struct McasDescriptor; |
bool mcas(McasDescriptor* desc); |
struct WordDescriptor { |
std::atomic<uintptr_t>* address; |
uintptr_t old; |
uintptr_t new_; |
McasDescriptor* parent; |
static_assert(std::pointer_traits<decltype(address)>::element_type::is_always_lock_free); |
}; |
enum StatusType { ACTIVE, SUCCESSFUL, FAILED }; |
struct McasDescriptor { |
std::atomic<StatusType> status; |
// NOTE: It's suggested to order the word descriptors by address. Then, |
// should two threads try to mutate the same words, one will always |
// succeed, potentially avoiding repeated collisions. |
std::vector<WordDescriptor> words; |
static_assert(decltype(status)::is_always_lock_free); |
}; |
// for placing descriptor sentinel in unused address bits-- but see comments below |
const uintptr_t DESCRIPTOR_MASK = 0xFFFF000000000000; |
const uintptr_t DESCRIPTOR_SENTINEL = 0xDCBA000000000000; |
// (The approach of in-band descriptors seems naive. Assuming CAS locations |
// can contain non-pointer values, even if a value exactly matches an active |
// descriptor, we don't know it's not by chance. While mcas() can verify that |
// expected and new values don't look like a descriptor, coordination between |
// this library and the application is necessary to avoid conflicts.) |
inline bool isDescriptorValue(uintptr_t val) { |
} |
inline uintptr_t descriptorPtrToValue(const WordDescriptor* ptr) { |
auto value = reinterpret_cast<uintptr_t>(ptr); |
assert(!(value & DESCRIPTOR_MASK)); |
return value | DESCRIPTOR_SENTINEL; |
} |
inline WordDescriptor* descriptorValueToPtr(uintptr_t value) { |
return reinterpret_cast<WordDescriptor*>(value & ~DESCRIPTOR_MASK); |
} |
std::pair<uintptr_t, uintptr_t> |
readInternal(const std::atomic<uintptr_t>* addr, McasDescriptor* self) { |
while (true) { |
auto content = addr->load(); |
auto value = content; |
if (isDescriptorValue(content)) { |
auto desc = descriptorValueToPtr(content); |
auto parent = desc->parent; |
if (parent != self && parent->status == ACTIVE) { |
mcas(parent); |
continue; |
} |
value = parent->status == SUCCESSFUL ? desc->new_ : desc->old; |
} |
return std::make_pair(content, value); |
} |
} |
uintptr_t read(const std::atomic<uintptr_t>& addr) { |
return readInternal(&addr, nullptr).second; |
} |
bool mcas(McasDescriptor* desc) { |
auto status = SUCCESSFUL; |
for (const auto& wordDesc : desc->words) { |
assert(!isDescriptorValue(wordDesc.old)); |
assert(!isDescriptorValue(wordDesc.new_)); |
auto desired_desc = descriptorPtrToValue(&wordDesc); |
retry_word: |
auto result = readInternal(wordDesc.address, desc); |
auto current_desc = result.first; |
// if this word already points to the right place, move on |
if (current_desc == desired_desc) continue; |
// if the expected value is different, the MCAS fails |
if (result.second != wordDesc.old) { |
status = FAILED; |
break; |
} |
if (desc->status != ACTIVE) break; |
// try to install the pointer to my descriptor; if failed, retry |
if (!wordDesc.address->compare_exchange_weak(current_desc, desired_desc)) { |
goto retry_word; |
} |
} |
auto expected_status = ACTIVE; |
if (desc->status.compare_exchange_strong(expected_status, status)) { |
// if I finalized this descriptor, mark it for reclamation |
//retireForCleanup(desc); |
} |
return (desc->status == SUCCESSFUL); |
} |
bool cas2(std::atomic<uintptr_t>* a_addr, uintptr_t a_old, uintptr_t a_new, |
std::atomic<uintptr_t>* b_addr, uintptr_t b_old, uintptr_t b_new) { |
// TODO: sort entries by address |
auto desc = new McasDescriptor { |
.status = ACTIVE, |
.words = { |
{ |
.address = a_addr, |
.old = a_old, |
.new_ = a_new, |
}, |
{ |
.address = b_addr, |
.old = b_old, |
.new_ = b_new, |
}, |
}, |
}; |
for (auto& wordDesc : desc->words) { |
wordDesc.parent = desc; |
} |
return mcas(desc); |
} |
// |
// Simple tests and benchmarks follow |
// |
#include <iostream> |
#include <chrono> |
using namespace std::chrono; |
void test_mcas() { |
printf("test_mcas\n"); |
std::atomic<uintptr_t> arr[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; |
assert(cas2(&arr[2], 2, 20, &arr[6], 6, 60)); |
assert(read(arr[0]) == 0); |
assert(read(arr[2]) == 20); |
assert(read(arr[6]) == 60); |
assert(cas2(&arr[2], 20, 30, &arr[6], 60, 70)); |
assert(read(arr[0]) == 0); |
assert(read(arr[2]) == 30); |
assert(read(arr[6]) == 70); |
} |
void test_mcas_identity() { |
printf("test_mcas_identity\n"); |
std::atomic<uintptr_t> a(5); |
std::atomic<uintptr_t> b(10); |
assert(cas2(&a, 5, 7, &b, 10, 10)); |
assert(read(a) == 7); |
assert(read(b) == 10); |
} |
// for n attempts, increment 2 positions in the array atomically |
void _mcas_list_do(int n, int inc, std::vector<std::atomic<uintptr_t>>* arr_, |
std::atomic<int>* success_count) { |
auto& arr = *arr_; |
auto size = arr.size(); |
assert(size % 2 == 0); |
auto half = size / 2; |
int local_success_count = 0; |
for (int i = 0; i < n; ++i) { |
int j = std::rand() % half; |
int k = half + std::rand() % half; |
auto v1 = read(arr[j]); |
auto v2 = read(arr[k]); |
local_success_count += cas2(&arr[j], v1, v1 + inc, &arr[k], v2, v2 + inc); |
} |
*success_count += local_success_count; |
} |
void benchmark_mcas_list(int array_size) { |
// parallel threads incrementing list elements |
int N_OPS = 10000000; |
int N_THREADS = std::thread::hardware_concurrency(); |
int INC = 3; |
printf("benchmark_mcas_list (array_size=%d, threads=%d)\n", array_size, N_THREADS); |
std::vector<std::atomic<uintptr_t>> arr(array_size); |
std::vector<std::thread> threads; |
std::atomic<int> success_count = 0; |
auto t0 = high_resolution_clock::now(); |
for (int i = 0; i < N_THREADS; ++i) { |
threads.push_back( |
std::thread(_mcas_list_do, N_OPS_PER_THREAD, INC, &arr, &success_count)); |
} |
for (auto &th : threads) { |
th.join(); |
} |
auto duration = duration_cast<microseconds>(high_resolution_clock::now() - t0); |
int sum = 0; |
for (const auto& v : arr) { |
sum += read(v); |
} |
assert(sum == 2 * INC * success_count); |
printf(" successful ops: %d (%.1f%%)\n", success_count.load(), |
100.0 * success_count / (N_THREADS * N_OPS_PER_THREAD)); |
printf(" duration (real): %.3f s\n", duration.count() * 1e-6); |
printf(" ops rate: %.2f M/s\n", success_count / (duration.count() * 1e-6) / 1e6); |
} |
int main() { |
test_mcas(); |
test_mcas_identity(); |
benchmark_mcas_list(10 /*array_size*/); |
benchmark_mcas_list(100); |
} |