Last active
September 11, 2022 12:30
-
-
Save PolarNick239/f727c0cd923398dc397a05f515452123 to your computer and use it in GitHub Desktop.
C++ concurrent blocking queue with limited size (based on boost condition_variable)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <queue> | |
#include <boost/thread/mutex.hpp> | |
#include <boost/thread/condition_variable.hpp> | |
// Based on https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html | |
template<typename Data> | |
class BlockingQueue { | |
private: | |
std::queue<Data> queue; | |
mutable boost::mutex queue_mutex; | |
const size_t queue_limit; | |
bool is_closed = false; | |
boost::condition_variable new_item_or_closed_event; | |
boost::condition_variable item_removed_event; | |
#ifndef NDEBUG | |
size_t pushes_in_progress = 0; | |
#endif | |
public: | |
BlockingQueue(size_t size_limit=0) : queue_limit(size_limit) | |
{} | |
void push(const Data& data) | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
#ifndef NDEBUG | |
++pushes_in_progress; | |
#endif | |
if (queue_limit > 0) { | |
while (queue.size() >= queue_limit) { | |
item_removed_event.wait(lock); | |
} | |
} | |
assert (!is_closed); | |
queue.push(data); | |
#ifndef NDEBUG | |
--pushes_in_progress; | |
#endif | |
new_item_or_closed_event.notify_one(); | |
} | |
bool try_push(const Data& data) | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
if (queue_limit > 0) { | |
if (queue.size() >= queue_limit) { | |
return false; | |
} | |
} | |
assert (!is_closed); | |
queue.push(data); | |
new_item_or_closed_event.notify_one(); | |
return true; | |
} | |
void close() | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
assert (!is_closed); | |
#ifndef NDEBUG | |
assert (pushes_in_progress == 0); | |
#endif | |
is_closed = true; | |
new_item_or_closed_event.notify_all(); | |
} | |
bool pop(Data &popped_value) | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
while (queue.empty()) { | |
if (is_closed) { | |
return false; | |
} | |
new_item_or_closed_event.wait(lock); | |
} | |
popped_value = queue.front(); | |
queue.pop(); | |
item_removed_event.notify_one(); | |
return true; | |
} | |
bool try_pop(Data &popped_value) | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
if (queue.empty()) { | |
return false; | |
} | |
popped_value = queue.front(); | |
queue.pop(); | |
item_removed_event.notify_one(); | |
return true; | |
} | |
bool empty() const | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
return queue.empty(); | |
} | |
bool closed() const | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
return is_closed; | |
} | |
size_t limit() const | |
{ | |
return queue_limit; | |
} | |
size_t size() const | |
{ | |
boost::mutex::scoped_lock lock(queue_mutex); | |
return queue.size(); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <gtest/gtest.h> | |
#include "blocking_queue.h" | |
#include <boost/thread/thread.hpp> | |
TEST(blocking_queue, simple_try_pop) | |
{ | |
const int n = 1000; | |
BlockingQueue<int> queue; | |
ASSERT_TRUE(queue.empty()); | |
for (int i = 0; i < n; ++i) { | |
int res; | |
ASSERT_FALSE(queue.try_pop(res)); | |
queue.push(i); | |
ASSERT_TRUE(queue.try_pop(res)); | |
ASSERT_EQ(i, res); | |
} | |
ASSERT_TRUE(queue.empty()); | |
queue.close(); | |
int res; | |
ASSERT_FALSE(queue.pop(res)); | |
ASSERT_TRUE(queue.closed()); | |
ASSERT_TRUE(queue.empty()); | |
} | |
TEST(blocking_queue, simple_pop) | |
{ | |
const int n = 1000; | |
BlockingQueue<int> queue; | |
ASSERT_TRUE(queue.empty()); | |
for (int i = 0; i < n; ++i) { | |
int res; | |
queue.push(i); | |
ASSERT_TRUE(queue.pop(res)); | |
ASSERT_EQ(i, res); | |
} | |
ASSERT_TRUE(queue.empty()); | |
queue.close(); | |
int res; | |
ASSERT_FALSE(queue.pop(res)); | |
ASSERT_TRUE(queue.closed()); | |
ASSERT_TRUE(queue.empty()); | |
} | |
TEST(blocking_queue, push_and_close) | |
{ | |
BlockingQueue<int> queue; | |
ASSERT_TRUE(queue.empty()); | |
ASSERT_FALSE(queue.closed()); | |
queue.push(239); | |
queue.close(); | |
ASSERT_FALSE(queue.empty()); | |
ASSERT_TRUE(queue.closed()); | |
int res; | |
ASSERT_TRUE(queue.pop(res)); | |
ASSERT_EQ(239, res); | |
ASSERT_FALSE(queue.pop(res)); | |
} | |
class QueuePopWorker { | |
BlockingQueue<int>& queue; | |
public: | |
std::vector<int> results; | |
QueuePopWorker(BlockingQueue<int>& queue) : queue(queue) {} | |
void start() { | |
int result; | |
while(queue.pop(result)) { | |
results.push_back(result); | |
} | |
ASSERT_TRUE(queue.empty()); | |
ASSERT_TRUE(queue.closed()); | |
} | |
}; | |
class QueuePushWorker { | |
BlockingQueue<int>& queue; | |
std::vector<int> data; | |
bool should_close; | |
public: | |
QueuePushWorker(BlockingQueue<int>& queue, std::vector<int> data, bool should_close) : queue(queue), data(data), should_close(should_close) {} | |
void start() { | |
int result; | |
for (auto value : data) { | |
if (queue.limit() != 0) { | |
ASSERT_LE(queue.size(), queue.limit()); | |
} | |
queue.push(value); | |
if (queue.limit() != 0) { | |
ASSERT_LE(queue.size(), queue.limit()); | |
} | |
} | |
if (should_close) { | |
queue.close(); | |
} | |
} | |
}; | |
TEST(blocking_queue, single_producer_single_consumer) | |
{ | |
const int runs = 1000; | |
const int n = 1000; | |
const int limit = 100; | |
for (int run = 0; run < runs; ++run) { | |
BlockingQueue<int> queue(limit); | |
ASSERT_TRUE(queue.empty()); | |
std::vector<int> data; | |
for (int i = 0; i < n; ++i) { | |
data.push_back(i); | |
} | |
QueuePushWorker producer(queue, data, true); | |
QueuePopWorker consumer(queue); | |
boost::thread consumer_thread(&QueuePopWorker::start, &consumer); | |
boost::thread producer_thread(&QueuePushWorker::start, &producer); | |
producer_thread.join(); | |
consumer_thread.join(); | |
ASSERT_TRUE(queue.empty()); | |
ASSERT_TRUE(queue.closed()); | |
ASSERT_EQ(consumer.results.size(), n); | |
for (int i = 0; i < n; ++i) { | |
ASSERT_EQ(consumer.results[i], i); | |
} | |
} | |
} | |
TEST(blocking_queue, multiple_producer_multilple_consumer) | |
{ | |
const int runs = 10; | |
const int n = 10000; | |
const int limit = 100; | |
const int units = 10; | |
for (int run = 0; run < runs; ++run) { | |
BlockingQueue<int> queue(limit); | |
ASSERT_TRUE(queue.empty()); | |
std::vector<QueuePushWorker> producers; | |
std::vector<QueuePopWorker> consumers; | |
for (int i = 0; i < units; ++i) { | |
std::vector<int> data; | |
for (int j = i * n; j < (i + 1) * n; ++j) { | |
data.push_back(j); | |
} | |
producers.push_back(QueuePushWorker(queue, data, false)); | |
consumers.push_back(QueuePopWorker(queue)); | |
} | |
std::vector<boost::thread> producers_threads(units); | |
std::vector<boost::thread> consumers_threads(units); | |
for (int i = 0; i < units; ++i) { | |
producers_threads[i] = boost::thread(&QueuePushWorker::start, &producers[i]); | |
consumers_threads[i] = boost::thread(&QueuePopWorker::start, &consumers[i]); | |
} | |
for (int i = 0; i < units; ++i) { | |
producers_threads[i].join(); | |
} | |
queue.close(); | |
for (int i = 0; i < units; ++i) { | |
consumers_threads[i].join(); | |
} | |
ASSERT_TRUE(queue.empty()); | |
ASSERT_TRUE(queue.closed()); | |
std::vector<bool> is_found(n * units, false); | |
for (int i = 0; i < units; ++i) { | |
for (int value : consumers[i].results) { | |
ASSERT_FALSE(is_found[value]); | |
ASSERT_GE(value, 0); | |
ASSERT_LT(value, n * units); | |
is_found[value] = true; | |
} | |
} | |
for (int i = 0; i < n * units; ++i) { | |
ASSERT_TRUE(is_found[i]); | |
} | |
} | |
} |
This is just unclean code - order of each explicit unlock() call and its' following notify_one()/notify_all() does not matter, because the thread which will be notified - before continuing its execution will wait for acquiring the same queue_mutex lock.
Thanks, fixed - all explicit unlock() calls removed.
On the other hand, you may prefer to add explicit unlock before each notify() call for slightly better performance (so that notified thread will not always wait for acquiring the same queue_mutex lock because it is still hold by notifier's thread, see 'Be careful when you notify' in https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html )
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
it confused me that you unlock before notify in push but not in pop, why?