Skip to content

Instantly share code, notes, and snippets.

@xkikeg
Created February 6, 2012 05:48
Show Gist options
  • Save xkikeg/1749985 to your computer and use it in GitHub Desktop.
Save xkikeg/1749985 to your computer and use it in GitHub Desktop.
Sequentialized Multithreaded Ring Buffer Implementation
#include <ostream>
#include <array>
#include <mutex>
#include <condition_variable>
#define DEBUG_PRINT
#ifdef DEBUG_PRINT
#include <iostream>
#endif
/**
* @note
* start: in=0, out=0
* o,i
* v
* buffer [ | | | | | | | | | | ]
*
* o i
* v v
* buffer [@|@| | | | | | | | | ]
*
* o i
* v v
* buffer [#|@| | | | | | | | | ]
*
* In this situation, pop should block until at least one push.
* o,i
* v
* buffer [#|#| | | | | | | | | ]
*
* push many items.
* i o
* v v
* buffer [@|#|@|@|@|@|@|@|@|@|@]
*
* Now buffer become full and push should wait or lose old data.
* i,o
* v
* buffer [@|@|@|@|@|@|@|@|@|@|@]
*
* pop consume some items and again push become available.
* o i
* v v
* buffer [@|@|#|#|@|@|@|@|@|@|@]
*
* As the result of this consideration, we can say that
* pop should wait until out != in
* push should wait until buffer get available
* Latter one is a bit difficult. We have to say that we should distinguish
* between overtaking and following.
*/
template <class T, size_t N>
class SequentialRingBuffer
{
private:
std::array<T, N> buffer;
volatile size_t in, out, capacity;
std::mutex lkm;
std::condition_variable cv;
public:
SequentialRingBuffer() : in(0), out(0), capacity(N) {}
void push(T val)
{
{
std::unique_lock<std::mutex> lk(lkm);
#ifdef DEBUG_PRINT
std::cerr << "==PUSH LOCK ENTERED\n";
#endif
cv.wait(lk, [&]{ return capacity > 0; });
#ifdef DEBUG_PRINT
std::cerr << "==PUSH COND WAIT FINISHED\n";
#endif
buffer[in] = val;
in = (in + 1) % N;
--capacity;
#ifdef DEBUG_PRINT
std::cerr << "==PUSHED " << val
<< " (i=" << in
<< ",o=" << out
<< ",c=" << capacity << ")"
<< " " << *this << "\n";
#endif
}
cv.notify_all();
}
T pop()
{
size_t oldout;
{
std::unique_lock<std::mutex> lk(lkm);
#ifdef DEBUG_PRINT
std::cerr << "==POP LOCK ENTERED\n";
#endif
cv.wait(lk, [&]{ return in != out || capacity == 0; });
#ifdef DEBUG_PRINT
std::cerr << "==POP COND WAIT FINISHED\n";
#endif
++capacity;
oldout = out;
out = (out + 1) % N;
}
T ret = std::move(buffer[oldout]);
#ifdef DEBUG_PRINT
std::cerr << "==POPPED " << ret
<< " (i=" << in
<< ",o=" << out
<< ",c=" << capacity << ")"
<< " " << *this << "\n";
#endif
cv.notify_all();
return ret;
}
friend std::ostream & operator<<(std::ostream & ost,
const SequentialRingBuffer<T, N> & buffer)
{
ost << '[';
for(size_t i=0; i < N; ++i)
{
if(i != 0) { ost << ','; }
if(i == buffer.in) { ost << '>'; }
ost << buffer.buffer[i];
if(i == buffer.out) { ost << '>'; }
}
ost << ']';
return ost;
}
};
// BEGIN TEST CODES.
#include <algorithm>
#include <future>
#include <iostream>
#include <random>
#include <thread>
#include <boost/format.hpp>
template <class T>
void producer(T & buffer, const char * str)
{
for(const char * p=str; ; ++p)
{
buffer.push(*p);
if(*p == '\0') { break; }
}
}
template <class T>
void consumer(T & buffer, bool isFlush, size_t nullcharcount=1)
{
char c;
if(nullcharcount == 0) { return; }
do
{
c = buffer.pop();
if(c == '\0') { --nullcharcount; }
else
{
std::cout << c;
if(isFlush) { std::cout << std::flush; }
}
} while(nullcharcount != 0);
std::cout << std::endl;
}
template<size_t BufferSize>
void single_test(const char * str)
{
typedef SequentialRingBuffer<char, BufferSize> simple_buf_t;
simple_buf_t buf;
std::future<void> fc =
std::async(std::launch::async,
consumer<simple_buf_t>, std::ref(buf), true, 1);
std::future<void> fp =
std::async(std::launch::async,
producer<simple_buf_t>, std::ref(buf), str);
fp.wait();
fc.wait();
}
template<size_t BufferSize>
void double_test(const char * str1, const char * str2)
{
typedef SequentialRingBuffer<char, BufferSize> simple_buf_t;
simple_buf_t buf;
std::future<void> fc =
std::async(std::launch::async,
consumer<simple_buf_t>, std::ref(buf), true, 2);
std::future<void> fp1 =
std::async(std::launch::async,
producer<simple_buf_t>, std::ref(buf), str1);
std::future<void> fp2 =
std::async(std::launch::async,
producer<simple_buf_t>, std::ref(buf), str2);
fp1.wait();
fp2.wait();
fc.wait();
}
template <size_t BufferSize, class Generator>
bool equal_test(size_t size, Generator genFunc)
{
SequentialRingBuffer<int, 32> buffer;
std::vector<int> test1(size), test2(size);
std::generate(test1.begin(), test1.end(), genFunc);
std::future<void> fconsumer =
std::async(std::launch::async,
[&]{
for(const auto val : test1)
{ buffer.push(val); }
});
std::future<void> fproducer =
std::async(std::launch::async,
[&]{
for(size_t i=0; i<size; ++i)
{ test2[i] = buffer.pop(); }
});
fconsumer.wait(), fproducer.wait();
bool flag=true;
for(size_t i=0; i < size; ++i)
{
std::cerr
<< boost::format("test1[%1$03d] : %2$3d test2[%1$03d] : %3$3d %4$s\n")
% i % test1[i] % test2[i] % (test1[i] == test2[i] ? "OK" : "###BAD###");
flag = flag && (test1[i] == test2[i]);
}
if(!flag) { std::cout << "FAILED!!\n"; }
return flag;
}
int main()
{
single_test<8>("HELLO, WORLD!");
double_test<16>("0123456789abcdefghijklmnopqrstuvwxyz",
".,!?=-+*/@ABCDEFGHIJKLMNOPQRSTUVWXYZ");
std::cout << "EQUALITY TEST : SIMPLE\n";
equal_test<8>(20, []{ return 101; });
std::cout << "EQUALITY TEST : RANDOM\n";
std::random_device rndev;
std::vector< std::uint_least32_t> vs(10);
std::generate(vs.begin(), vs.end(), std::ref(rndev));
std::seed_seq seed(vs.begin(), vs.end());
auto rnd=std::bind(std::uniform_int_distribution<int>(0, 100),
std::mt19937(seed));
if(!equal_test<32>(200, [&rnd]{ return rnd(); }))
{
return 1;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment