Created
May 1, 2020 20:04
-
-
Save icedraco/4db78076b9e8e63c55e91eaeb32404ea to your computer and use it in GitHub Desktop.
C++ blocking queue implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <mutex> | |
#include <condition_variable> | |
#include "circular_queue.hpp" | |
template <typename T, size_t N> | |
class bqueue | |
{ | |
private: | |
circular_queue<T, N> _cq; | |
std::mutex _mu_cq; | |
std::condition_variable _cv_nonempty; | |
std::condition_variable _cv_nonfull; | |
public: | |
bqueue() {} | |
~bqueue() {} | |
bool is_empty() { | |
std::unique_lock<std::mutex> ul(_mu_cq); | |
return _cq.is_empty(); | |
} | |
bool is_full() { | |
std::unique_lock<std::mutex> ul(_mu_cq); | |
return _cq.is_full(); | |
} | |
void put(T item) { | |
std::unique_lock<std::mutex> ul(_mu_cq); | |
_cv_nonfull.wait(ul, [&]{ return !_cq.is_full(); }); | |
_cq.put(item); | |
_cv_nonempty.notify_one(); | |
} | |
T take(void) { | |
std::unique_lock<std::mutex> ul(_mu_cq); | |
_cv_nonempty.wait(ul, [&]{ return !_cq.is_empty(); }); | |
T item = _cq.take(); | |
_cv_nonfull.notify_one(); | |
return item; | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Circular Queue | |
* | |
* Warning: | |
* This implementation is NOT THREAD-SAFE! | |
* | |
* Example: | |
* circular_queue<int, bufsize> q; | |
* q.is_empty(); // true | |
* q.put(1); | |
* q.put(2); | |
* q.put(3); | |
* q.is_full(); // true | |
* q.put(4); // queue_full exception thrown | |
* q.take(); // 3 | |
* q.take(); // 2 | |
* q.take(); // 1 | |
* q.take(); // queue_empty exception thrown | |
* | |
* circular_queue<int, bufsize> q2 = q; // values are copied | |
* q2.put(10); | |
* q2.is_empty(); // false | |
* q.is_empty(); // true - q does not have a "10" in it either | |
* | |
* Author: | |
* IceDragon <[email protected]> | |
*/ | |
#pragma once | |
#include <array> | |
#include <exception> | |
/** | |
* Thrown when the underlying circular_queue is at capacity and cannot store | |
* any more items. | |
*/ | |
class queue_full : public std::exception { | |
const char* what() const noexcept { | |
return "queue is full"; | |
} | |
}; | |
/** | |
* Thrown when the underlying circular_queue is empty and has no values to take | |
* out. | |
*/ | |
class queue_empty : public std::exception { | |
const char* what() const noexcept { | |
return "queue is empty"; | |
} | |
}; | |
/** | |
* A queue based on a circular array data structure | |
* | |
* WARNING: This class is not thread-safe! | |
*/ | |
template <typename T, std::size_t N> | |
class circular_queue | |
{ | |
private: | |
std::array<T, N> _arr; | |
int _idx_insert = 0; | |
int _idx_head = 0; | |
int _idx_tail = 0; | |
public: | |
circular_queue(void) {} | |
~circular_queue() noexcept {} | |
/** | |
* Check if this queue is full | |
*/ | |
inline bool is_full(void) const noexcept { | |
return (_idx_tail + 1) % N == _idx_head; | |
} | |
/** | |
* Check if this queue is empty | |
*/ | |
inline bool is_empty(void) const noexcept { | |
return _idx_tail == _idx_insert && _idx_head == _idx_insert; | |
} | |
/** | |
* Clear this queue so that it's empty | |
*/ | |
inline void clear(void) noexcept { | |
_idx_insert = 0; | |
_idx_head = 0; | |
_idx_tail = 0; | |
} | |
/** | |
* Put an item onto the back of the queue | |
* | |
* If the queue is full, a queue_full exception will be thrown | |
*/ | |
inline void put(const T& item) { | |
if (is_full()) { | |
throw queue_full(); | |
} | |
_arr[_idx_insert] = item; | |
_idx_tail = _idx_insert; | |
_idx_insert = (_idx_insert + 1) % N; | |
} | |
/** | |
* Take the next item from the front of the queue | |
* | |
* If the queue is empty, a queue_empty exception will be thrown | |
* | |
* Return: | |
* next item | |
*/ | |
inline T take(void) { | |
if (is_empty()) { | |
throw queue_empty(); | |
} | |
T item = _arr[_idx_head]; | |
if (_idx_head == _idx_tail) { | |
_idx_tail = (_idx_tail + 1) % N; | |
_idx_head = _idx_tail; | |
} else { | |
_idx_head = (_idx_head + 1) % N; | |
} | |
return item; | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdio> | |
#include <thread> | |
#include <functional> | |
#include "bqueue.hpp" | |
const int bufsz = 4; | |
class TestItem { | |
public: | |
int value; | |
TestItem(int n) : value(n) { | |
printf(" * TestItem(%d) constructed\n", value); | |
} | |
TestItem(const TestItem &other) : value(other.value) { | |
printf(" * TestItem(%d) copied\n", value); | |
} | |
~TestItem() { | |
printf(" * TestItem(%d) destroyed\n", value); | |
} | |
}; | |
void run(std::string name, bqueue<std::shared_ptr<TestItem>, bufsz> &q) { | |
printf("[%s] Thread Started\n", name.c_str()); | |
do { | |
if (q.is_empty()) { | |
printf("[%s] queue empty\n", name.c_str()); | |
} | |
auto x = q.take(); | |
printf("[%s] in: %d\n", name.c_str(), x->value); | |
if (x->value < 0) { | |
break; | |
} | |
} while (true); | |
} | |
int main(void) { | |
bqueue<std::shared_ptr<TestItem>, bufsz> q; | |
std::thread t1(run, "T1", std::ref(q)); | |
std::thread t2(run, "T2", std::ref(q)); | |
for (int i = 0; i < 100; i++) { | |
if (q.is_full()) { | |
printf("[XX] queue full\n"); | |
} | |
q.put(std::shared_ptr<TestItem>(new TestItem(i))); | |
} | |
auto shutdown = std::shared_ptr<TestItem>(new TestItem(-1)); | |
q.put(shutdown); | |
q.put(shutdown); | |
t1.join(); | |
t2.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment