Created
May 22, 2015 07:59
-
-
Save brugeman/30070d4c309a17f21597 to your computer and use it in GitHub Desktop.
SMR pointer storage checked with relacy race detector.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdio> | |
#include "relacy/relacy_std.hpp" | |
namespace smrp_test | |
{ | |
// Single Writer Multiple Readers pointer storage, | |
// both writers and readers are lock-free, | |
// writers wait for all readers to finish | |
// with previous value so it can be deleted. | |
// Thus, write throughput is limited by slowest | |
// reader, and writes are meant to happen very rarely. | |
template<typename T, size_t READER_THREAD_NUM> | |
class swmr_pointer_store_t | |
{ | |
static const size_t CACHE_LINE_SIZE = 64; | |
typedef std::atomic<T *> aptr_t; | |
// no-false-sharing atomic pointer | |
class nfs_aptr_t | |
{ | |
public: | |
nfs_aptr_t () : p (0) {} | |
aptr_t p; | |
private: | |
nfs_aptr_t (const nfs_aptr_t & other); | |
nfs_aptr_t & operator= (const nfs_aptr_t & other); | |
char pad_[CACHE_LINE_SIZE - sizeof (aptr_t)]; | |
}; | |
// value owned by writer | |
nfs_aptr_t main_ptr_; | |
// values owned by readers | |
mutable nfs_aptr_t hazard_ptrs_[READER_THREAD_NUM]; | |
// Implements wait strategy for writers in case | |
// the old value is used by readers. | |
void wait () | |
{ | |
// std::this_thread::yield (); | |
} | |
// no copies! moving is ok | |
swmr_pointer_store_t (const swmr_pointer_store_t & other); | |
swmr_pointer_store_t & operator= (const swmr_pointer_store_t && other); | |
public: | |
swmr_pointer_store_t () | |
{} | |
// Must only be called by writer thread, | |
// wait-free, 1 atomic load. | |
T * get () const | |
{ | |
return main_ptr_.p.load (rl::memory_order_relaxed); | |
} | |
// Writer thread calls this to replace previous value, | |
// the previous value is returned as soon as | |
// no readers are using it. That is, this will work as slow | |
// as the slowest reader works (and not faster than 'wait' above). | |
// This call's complexity is O(N) w/ N=READER_THREAD_NUM, and | |
// is intended to be used rarely. | |
T * set (T * ptr) | |
{ | |
// replace old value with new value | |
rl::var<T *> old; | |
// seq_cst is required to make contents pointed by ptr to get | |
// visible to readers. | |
VAR(old) = main_ptr_.p.exchange (ptr, rl::memory_order_seq_cst); | |
// do we actually need to check readers? | |
if (VAR(old) != 0 && ptr != VAR(old)) | |
{ | |
// now make sure old value is not in use | |
bool locked = false; | |
do | |
{ | |
rl::var<T *> v = 0; | |
for (size_t i = 0; | |
VAR(v) != VAR(old) && i < READER_THREAD_NUM; | |
++i) | |
VAR(v) = hazard_ptrs_[i].p.load (rl::memory_order_acquire); | |
// the old value is locked if it sits in one | |
// of reader threads' slots | |
locked = VAR(v) == VAR(old); | |
// some reader is using our value? | |
// ok, lets wait to let him unlock it. | |
if (locked) | |
wait (); | |
} while (locked); | |
} | |
// ok, no readers use old value now | |
return VAR(old); | |
} | |
// Readers use 'acquire' to get current value of pointer, | |
// and notify writer that the value is in use until 'release'. | |
// This call might loop while in contention with writer, | |
// but since writes are to happen rarely, looping is | |
// very unlikely. | |
T * acquire (const size_t thread_index) const | |
{ | |
RL_ASSERT (thread_index < READER_THREAD_NUM); | |
rl::var<T *> v; | |
rl::var<T *> v1; | |
do | |
{ | |
// load the pointer | |
VAR(v) = main_ptr_.p.load (rl::memory_order_acquire); | |
// save it to our thread's hazard pointer slot | |
hazard_ptrs_[thread_index].p | |
.store (VAR(v), rl::memory_order_release); | |
// now - check main pointer again, to make sure | |
// writer hasn't managed to perform 'set' btw previous 2 operations | |
// NOTE: why is seq_cst memory ordering required? Tests | |
// fail with other orderings, and my guess is that compiler | |
// memory model allows it to completely eliminate this | |
// v1 reading from same pointer as v and their comparison, | |
// as those are meaningless in single threaded code. Seq_cst | |
// forces compiler to avoid that optimizations. | |
VAR(v1) = main_ptr_.p.load (rl::memory_order_seq_cst); | |
// if writer didn't change anything, | |
// return our 'locked' value | |
} | |
while (VAR(v1) != VAR(v)); | |
// we don't wait on retry as writer | |
// has already completed (or we wouldn't need to retry) | |
return VAR(v); | |
} | |
// Readers use 'release' to notify writers that the last | |
// read value is no longer in use. It is wait-free - | |
// 1 atomic store. | |
void release (const size_t thread_index) const | |
{ | |
RL_ASSERT (thread_index < READER_THREAD_NUM); | |
// notify writer that we're done | |
hazard_ptrs_[thread_index].p.store (0, rl::memory_order_release); | |
} | |
}; | |
static const size_t READERS_NUM = 1; | |
struct race_test : rl::test_suite<race_test, READERS_NUM + 1> | |
{ | |
swmr_pointer_store_t<size_t, READERS_NUM> pointer; | |
// executed in single thread before main thread function | |
void before () | |
{ | |
// printf ("pointer.get () %p\n", pointer.get ()); | |
// fflush (stdout); | |
RL_ASSERT (pointer.get () == 0); | |
} | |
// main thread function | |
void thread (const unsigned int thread_index) | |
{ | |
if (0 == thread_index) | |
{ | |
const size_t WRITES = 100; | |
rl::var<size_t *> value = 0; | |
for (size_t i = 0; i < WRITES; ++i) | |
{ | |
VAR(value) = new size_t; | |
*VAR(value) = i; | |
// printf ("%lu new value %p %lu\n", | |
// i, (size_t *)VAR(value), *VAR(value)); | |
// fflush (stdout); | |
rl::var<size_t *> old_value; | |
VAR(old_value) = pointer.set (VAR(value)); | |
// printf ("%lu old value %p %lu\n", | |
// i, (size_t *)VAR(old_value), | |
// VAR(old_value) ? *VAR(old_value) : 0); | |
// fflush (stdout); | |
delete VAR(old_value); | |
} | |
delete pointer.set (0); | |
} | |
else | |
{ | |
const size_t reader_index = thread_index - 1; | |
RL_ASSERT (reader_index < thread_index); | |
const size_t READS = 100; | |
rl::var<size_t> last_value = 0; | |
for (size_t i = 0; i < READS; ++i) | |
{ | |
rl::var<size_t *> p; | |
VAR(p) = pointer.acquire (reader_index); | |
// printf ("%lu %lu %p %lu %lu\n", | |
// reader_index, i, (size_t *)VAR(p), VAR(p) ? *VAR(p) : 0, | |
// (size_t)VAR(last_value)); | |
// fflush (stdout); | |
if (VAR(p)) | |
{ | |
// we must read non-decreasing sequence | |
RL_ASSERT (*VAR(p) >= VAR(last_value)); | |
VAR(last_value) = *VAR(p); | |
} | |
pointer.release (reader_index); | |
} | |
} | |
} | |
// executed in single thread after main thread function | |
void after() | |
{ | |
RL_ASSERT (pointer.get () == 0); | |
} | |
// executed in single thread after every 'visible' action in main threads | |
// disallowed to modify any state | |
void invariant() | |
{ | |
} | |
}; | |
void test () | |
{ | |
rl::simulate<race_test> (); | |
} | |
}; /* namespace smrp_test */ | |
int | |
main () | |
{ | |
smrp_test::test (); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment