Created
February 6, 2012 05:48
-
-
Save xkikeg/1749985 to your computer and use it in GitHub Desktop.
Sequentialized Multithreaded Ring Buffer Implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <ostream> | |
#include <array> | |
#include <mutex> | |
#include <condition_variable> | |
#define DEBUG_PRINT | |
#ifdef DEBUG_PRINT | |
#include <iostream> | |
#endif | |
/** | |
* @note | |
* start: in=0, out=0 | |
* o,i | |
* v | |
* buffer [ | | | | | | | | | | ] | |
* | |
* o i | |
* v v | |
* buffer [@|@| | | | | | | | | ] | |
* | |
* o i | |
* v v | |
* buffer [#|@| | | | | | | | | ] | |
* | |
* In this situation, pop should block until at least one push. | |
* o,i | |
* v | |
* buffer [#|#| | | | | | | | | ] | |
* | |
* push many items. | |
* i o | |
* v v | |
* buffer [@|#|@|@|@|@|@|@|@|@|@] | |
* | |
* Now buffer become full and push should wait or lose old data. | |
* i,o | |
* v | |
* buffer [@|@|@|@|@|@|@|@|@|@|@] | |
* | |
* pop consume some items and again push become available. | |
* o i | |
* v v | |
* buffer [@|@|#|#|@|@|@|@|@|@|@] | |
* | |
* As the result of this consideration, we can say that | |
* pop should wait until out != in | |
* push should wait until buffer get available | |
* Latter one is a bit difficult. We have to say that we should distinguish | |
* between overtaking and following. | |
*/ | |
template <class T, size_t N> | |
class SequentialRingBuffer | |
{ | |
private: | |
std::array<T, N> buffer; | |
volatile size_t in, out, capacity; | |
std::mutex lkm; | |
std::condition_variable cv; | |
public: | |
SequentialRingBuffer() : in(0), out(0), capacity(N) {} | |
void push(T val) | |
{ | |
{ | |
std::unique_lock<std::mutex> lk(lkm); | |
#ifdef DEBUG_PRINT | |
std::cerr << "==PUSH LOCK ENTERED\n"; | |
#endif | |
cv.wait(lk, [&]{ return capacity > 0; }); | |
#ifdef DEBUG_PRINT | |
std::cerr << "==PUSH COND WAIT FINISHED\n"; | |
#endif | |
buffer[in] = val; | |
in = (in + 1) % N; | |
--capacity; | |
#ifdef DEBUG_PRINT | |
std::cerr << "==PUSHED " << val | |
<< " (i=" << in | |
<< ",o=" << out | |
<< ",c=" << capacity << ")" | |
<< " " << *this << "\n"; | |
#endif | |
} | |
cv.notify_all(); | |
} | |
T pop() | |
{ | |
size_t oldout; | |
{ | |
std::unique_lock<std::mutex> lk(lkm); | |
#ifdef DEBUG_PRINT | |
std::cerr << "==POP LOCK ENTERED\n"; | |
#endif | |
cv.wait(lk, [&]{ return in != out || capacity == 0; }); | |
#ifdef DEBUG_PRINT | |
std::cerr << "==POP COND WAIT FINISHED\n"; | |
#endif | |
++capacity; | |
oldout = out; | |
out = (out + 1) % N; | |
} | |
T ret = std::move(buffer[oldout]); | |
#ifdef DEBUG_PRINT | |
std::cerr << "==POPPED " << ret | |
<< " (i=" << in | |
<< ",o=" << out | |
<< ",c=" << capacity << ")" | |
<< " " << *this << "\n"; | |
#endif | |
cv.notify_all(); | |
return ret; | |
} | |
friend std::ostream & operator<<(std::ostream & ost, | |
const SequentialRingBuffer<T, N> & buffer) | |
{ | |
ost << '['; | |
for(size_t i=0; i < N; ++i) | |
{ | |
if(i != 0) { ost << ','; } | |
if(i == buffer.in) { ost << '>'; } | |
ost << buffer.buffer[i]; | |
if(i == buffer.out) { ost << '>'; } | |
} | |
ost << ']'; | |
return ost; | |
} | |
}; | |
// BEGIN TEST CODES. | |
#include <algorithm> | |
#include <future> | |
#include <iostream> | |
#include <random> | |
#include <thread> | |
#include <boost/format.hpp> | |
template <class T> | |
void producer(T & buffer, const char * str) | |
{ | |
for(const char * p=str; ; ++p) | |
{ | |
buffer.push(*p); | |
if(*p == '\0') { break; } | |
} | |
} | |
template <class T> | |
void consumer(T & buffer, bool isFlush, size_t nullcharcount=1) | |
{ | |
char c; | |
if(nullcharcount == 0) { return; } | |
do | |
{ | |
c = buffer.pop(); | |
if(c == '\0') { --nullcharcount; } | |
else | |
{ | |
std::cout << c; | |
if(isFlush) { std::cout << std::flush; } | |
} | |
} while(nullcharcount != 0); | |
std::cout << std::endl; | |
} | |
template<size_t BufferSize> | |
void single_test(const char * str) | |
{ | |
typedef SequentialRingBuffer<char, BufferSize> simple_buf_t; | |
simple_buf_t buf; | |
std::future<void> fc = | |
std::async(std::launch::async, | |
consumer<simple_buf_t>, std::ref(buf), true, 1); | |
std::future<void> fp = | |
std::async(std::launch::async, | |
producer<simple_buf_t>, std::ref(buf), str); | |
fp.wait(); | |
fc.wait(); | |
} | |
template<size_t BufferSize> | |
void double_test(const char * str1, const char * str2) | |
{ | |
typedef SequentialRingBuffer<char, BufferSize> simple_buf_t; | |
simple_buf_t buf; | |
std::future<void> fc = | |
std::async(std::launch::async, | |
consumer<simple_buf_t>, std::ref(buf), true, 2); | |
std::future<void> fp1 = | |
std::async(std::launch::async, | |
producer<simple_buf_t>, std::ref(buf), str1); | |
std::future<void> fp2 = | |
std::async(std::launch::async, | |
producer<simple_buf_t>, std::ref(buf), str2); | |
fp1.wait(); | |
fp2.wait(); | |
fc.wait(); | |
} | |
template <size_t BufferSize, class Generator> | |
bool equal_test(size_t size, Generator genFunc) | |
{ | |
SequentialRingBuffer<int, 32> buffer; | |
std::vector<int> test1(size), test2(size); | |
std::generate(test1.begin(), test1.end(), genFunc); | |
std::future<void> fconsumer = | |
std::async(std::launch::async, | |
[&]{ | |
for(const auto val : test1) | |
{ buffer.push(val); } | |
}); | |
std::future<void> fproducer = | |
std::async(std::launch::async, | |
[&]{ | |
for(size_t i=0; i<size; ++i) | |
{ test2[i] = buffer.pop(); } | |
}); | |
fconsumer.wait(), fproducer.wait(); | |
bool flag=true; | |
for(size_t i=0; i < size; ++i) | |
{ | |
std::cerr | |
<< boost::format("test1[%1$03d] : %2$3d test2[%1$03d] : %3$3d %4$s\n") | |
% i % test1[i] % test2[i] % (test1[i] == test2[i] ? "OK" : "###BAD###"); | |
flag = flag && (test1[i] == test2[i]); | |
} | |
if(!flag) { std::cout << "FAILED!!\n"; } | |
return flag; | |
} | |
int main() | |
{ | |
single_test<8>("HELLO, WORLD!"); | |
double_test<16>("0123456789abcdefghijklmnopqrstuvwxyz", | |
".,!?=-+*/@ABCDEFGHIJKLMNOPQRSTUVWXYZ"); | |
std::cout << "EQUALITY TEST : SIMPLE\n"; | |
equal_test<8>(20, []{ return 101; }); | |
std::cout << "EQUALITY TEST : RANDOM\n"; | |
std::random_device rndev; | |
std::vector< std::uint_least32_t> vs(10); | |
std::generate(vs.begin(), vs.end(), std::ref(rndev)); | |
std::seed_seq seed(vs.begin(), vs.end()); | |
auto rnd=std::bind(std::uniform_int_distribution<int>(0, 100), | |
std::mt19937(seed)); | |
if(!equal_test<32>(200, [&rnd]{ return rnd(); })) | |
{ | |
return 1; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment