Skip to content

Instantly share code, notes, and snippets.

@StephanDollberg
Created January 18, 2023 10:43
Show Gist options
  • Save StephanDollberg/1720ea2b5b516999a19d1e44426966cf to your computer and use it in GitHub Desktop.
Save StephanDollberg/1720ea2b5b516999a19d1e44426966cf to your computer and use it in GitHub Desktop.
#include <cstddef>
#include <utility>
#include <atomic>
#include <thread>
#include <iostream>
namespace aeron
{
namespace util
{
template<typename E>
std::pair<E *, std::size_t> addToArray(E *oldArray, std::size_t oldLength, E element)
{
std::size_t newLength = oldLength + 1;
E *newArray = new E[newLength];
// std::printf("push %p %lu\n", newArray, newLength);
for (std::size_t i = 0; i < oldLength; i++)
{
newArray[i] = oldArray[i];
}
newArray[oldLength] = element;
return { newArray, newLength };
}
template<typename E>
std::pair<E *, std::size_t> removeFromArray(E *oldArray, std::size_t oldLength, std::size_t index)
{
std::size_t newLength = oldLength - 1;
E *newArray = new E[newLength];
// std::printf("push %p %lu\n", newArray, newLength);
for (std::size_t i = 0, j = 0; i < oldLength; i++)
{
if (i != index)
{
newArray[j++] = oldArray[i];
}
}
return { newArray, newLength };
}
}
namespace concurrent
{
template<typename E>
class AtomicArrayUpdater
{
public:
AtomicArrayUpdater() = default;
~AtomicArrayUpdater() = default;
__attribute__((noinline))
inline std::pair<E *, std::size_t> load() const
{
while (true)
{
std::int64_t changeNumber = m_endChange.load(std::memory_order_acquire);
E *array = m_array.first;
std::size_t length = m_array.second;
// The `acquire` fence is linked to the `release` fence in the mutator methods and is needed to prevent the
// older GCC compilers (<=7) from generating the wrong code.
std::atomic_thread_fence(std::memory_order_acquire);
if (changeNumber == m_beginChange.load(std::memory_order_acquire))
{
return { array, length };
}
}
}
__attribute__((noinline))
inline std::pair<E *, std::size_t> store(E *newArray, std::size_t newLength)
{
while (true)
{
std::int64_t changeNumber = m_endChange.load(std::memory_order_acquire);
E *array = m_array.first;
std::size_t length = m_array.second;
// The `acquire` fence is linked to the `release` fence in the mutator methods and is needed to prevent the
// older GCC compilers (<=7) from generating the wrong code.
std::atomic_thread_fence(std::memory_order_acquire);
if (m_beginChange.compare_exchange_strong(changeNumber, changeNumber + 1, std::memory_order_acq_rel))
{
// The `release` fence is added here to prevent the older GCC compilers (<=7) from generating the wrong
// code.
std::atomic_thread_fence(std::memory_order_release);
m_array.first = newArray;
m_array.second = newLength;
m_endChange.store(changeNumber + 1, std::memory_order_release);
return { array, length };
}
}
}
__attribute__((noinline))
std::pair<E *, std::size_t> addElement(E element)
{
while (true)
{
std::int64_t changeNumber = m_endChange.load(std::memory_order_acquire);
E *array = m_array.first;
std::size_t length = m_array.second;
// The `acquire` fence is linked to the `release` fence in the mutator methods and is needed to prevent the
// older GCC compilers (<=7) from generating the wrong code.
std::atomic_thread_fence(std::memory_order_acquire);
if (m_beginChange.compare_exchange_strong(changeNumber, changeNumber + 1, std::memory_order_acq_rel))
{
std::pair<E *, std::size_t> newArray = aeron::util::addToArray(array, length, element);
// The `release` fence is added here to prevent the older GCC compilers (<=7) from generating the wrong
// code.
std::atomic_thread_fence(std::memory_order_release);
m_array.first = newArray.first;
m_array.second = newArray.second;
m_endChange.store(changeNumber + 1, std::memory_order_release);
return { array, length };
}
}
}
template<typename F>
__attribute__((noinline))
std::pair<E *, std::size_t> removeElement(F &&func)
{
retry: while (true)
{
std::int64_t changeNumber = m_endChange.load(std::memory_order_acquire);
E *array = m_array.first;
std::size_t length = m_array.second;
// The `acquire` fence is linked to the `release` fence in the mutator methods and is needed to prevent the
// older GCC compilers (<=7) from generating the wrong code.
std::atomic_thread_fence(std::memory_order_acquire);
if (changeNumber == m_beginChange.load(std::memory_order_acquire))
{
for (std::size_t i = 0; i < length; i++)
{
if (func(array[i]))
{
if (m_beginChange.compare_exchange_strong(
changeNumber, changeNumber + 1, std::memory_order_acq_rel))
{
std::pair<E *, std::size_t> newArray = aeron::util::removeFromArray(array, length, i);
// The `release` fence is added here to prevent the older GCC compilers (<=7) from
// generating the wrong code.
std::atomic_thread_fence(std::memory_order_release);
m_array.first = newArray.first;
m_array.second = newArray.second;
m_endChange.store(changeNumber + 1, std::memory_order_release);
return { array, i };
}
else
{
goto retry;
}
}
}
return { nullptr, 0 };
}
}
}
private:
std::atomic<std::int64_t> m_beginChange = { -1 };
std::atomic<std::int64_t> m_endChange = { -1 };
std::pair<E*, std::size_t> m_array = { nullptr, 0 };
};
}}
int main() {
std::int64_t counter = 0;
aeron::concurrent::AtomicArrayUpdater<std::int64_t> aau;
aau.addElement(1);
aau.addElement(2);
aau.addElement(3);
aau.addElement(4);
aau.addElement(5);
const std::int64_t max = 100000000;
std::thread writer([&] () {
std::int64_t i = 1;
while (i < max) {
++i;
auto res = aau.removeElement([&] (auto ele) {
return ele == i;
});
aau.addElement(i + 5);
}
});
std::thread reader([&] () {
std::int64_t i = 0;
while (i < max * 100)
{
++i;
auto res = aau.load();
if (res.first == nullptr)
{
continue;
}
// std::printf("pop %p %lu\n", res.first, res.second);
counter += res.first[res.second - 1];
}
});
writer.join();
reader.join();
std::cout << counter << std::endl;
}
clang++ -g -fno-omit-frame-pointer -Og -fsanitize=address -std=c++2b test.cpp -o test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment