Skip to content

Instantly share code, notes, and snippets.

@icedraco
Created May 1, 2020 20:04
Show Gist options
  • Save icedraco/4db78076b9e8e63c55e91eaeb32404ea to your computer and use it in GitHub Desktop.
Save icedraco/4db78076b9e8e63c55e91eaeb32404ea to your computer and use it in GitHub Desktop.
C++ blocking queue implementation
#pragma once
#include <mutex>
#include <condition_variable>
#include "circular_queue.hpp"
template <typename T, size_t N>
class bqueue
{
private:
circular_queue<T, N> _cq;
std::mutex _mu_cq;
std::condition_variable _cv_nonempty;
std::condition_variable _cv_nonfull;
public:
bqueue() {}
~bqueue() {}
bool is_empty() {
std::unique_lock<std::mutex> ul(_mu_cq);
return _cq.is_empty();
}
bool is_full() {
std::unique_lock<std::mutex> ul(_mu_cq);
return _cq.is_full();
}
void put(T item) {
std::unique_lock<std::mutex> ul(_mu_cq);
_cv_nonfull.wait(ul, [&]{ return !_cq.is_full(); });
_cq.put(item);
_cv_nonempty.notify_one();
}
T take(void) {
std::unique_lock<std::mutex> ul(_mu_cq);
_cv_nonempty.wait(ul, [&]{ return !_cq.is_empty(); });
T item = _cq.take();
_cv_nonfull.notify_one();
return item;
}
};
/**
* Circular Queue
*
* Warning:
* This implementation is NOT THREAD-SAFE!
*
* Example:
* circular_queue<int, bufsize> q;
* q.is_empty(); // true
* q.put(1);
* q.put(2);
* q.put(3);
* q.is_full(); // true
* q.put(4); // queue_full exception thrown
* q.take(); // 3
* q.take(); // 2
* q.take(); // 1
* q.take(); // queue_empty exception thrown
*
* circular_queue<int, bufsize> q2 = q; // values are copied
* q2.put(10);
* q2.is_empty(); // false
* q.is_empty(); // true - q does not have a "10" in it either
*
* Author:
* IceDragon <[email protected]>
*/
#pragma once
#include <array>
#include <exception>
/**
* Thrown when the underlying circular_queue is at capacity and cannot store
* any more items.
*/
class queue_full : public std::exception {
const char* what() const noexcept {
return "queue is full";
}
};
/**
* Thrown when the underlying circular_queue is empty and has no values to take
* out.
*/
class queue_empty : public std::exception {
const char* what() const noexcept {
return "queue is empty";
}
};
/**
* A queue based on a circular array data structure
*
* WARNING: This class is not thread-safe!
*/
template <typename T, std::size_t N>
class circular_queue
{
private:
std::array<T, N> _arr;
int _idx_insert = 0;
int _idx_head = 0;
int _idx_tail = 0;
public:
circular_queue(void) {}
~circular_queue() noexcept {}
/**
* Check if this queue is full
*/
inline bool is_full(void) const noexcept {
return (_idx_tail + 1) % N == _idx_head;
}
/**
* Check if this queue is empty
*/
inline bool is_empty(void) const noexcept {
return _idx_tail == _idx_insert && _idx_head == _idx_insert;
}
/**
* Clear this queue so that it's empty
*/
inline void clear(void) noexcept {
_idx_insert = 0;
_idx_head = 0;
_idx_tail = 0;
}
/**
* Put an item onto the back of the queue
*
* If the queue is full, a queue_full exception will be thrown
*/
inline void put(const T& item) {
if (is_full()) {
throw queue_full();
}
_arr[_idx_insert] = item;
_idx_tail = _idx_insert;
_idx_insert = (_idx_insert + 1) % N;
}
/**
* Take the next item from the front of the queue
*
* If the queue is empty, a queue_empty exception will be thrown
*
* Return:
* next item
*/
inline T take(void) {
if (is_empty()) {
throw queue_empty();
}
T item = _arr[_idx_head];
if (_idx_head == _idx_tail) {
_idx_tail = (_idx_tail + 1) % N;
_idx_head = _idx_tail;
} else {
_idx_head = (_idx_head + 1) % N;
}
return item;
}
};
#include <cstdio>
#include <thread>
#include <functional>
#include "bqueue.hpp"
const int bufsz = 4;
class TestItem {
public:
int value;
TestItem(int n) : value(n) {
printf(" * TestItem(%d) constructed\n", value);
}
TestItem(const TestItem &other) : value(other.value) {
printf(" * TestItem(%d) copied\n", value);
}
~TestItem() {
printf(" * TestItem(%d) destroyed\n", value);
}
};
void run(std::string name, bqueue<std::shared_ptr<TestItem>, bufsz> &q) {
printf("[%s] Thread Started\n", name.c_str());
do {
if (q.is_empty()) {
printf("[%s] queue empty\n", name.c_str());
}
auto x = q.take();
printf("[%s] in: %d\n", name.c_str(), x->value);
if (x->value < 0) {
break;
}
} while (true);
}
int main(void) {
bqueue<std::shared_ptr<TestItem>, bufsz> q;
std::thread t1(run, "T1", std::ref(q));
std::thread t2(run, "T2", std::ref(q));
for (int i = 0; i < 100; i++) {
if (q.is_full()) {
printf("[XX] queue full\n");
}
q.put(std::shared_ptr<TestItem>(new TestItem(i)));
}
auto shutdown = std::shared_ptr<TestItem>(new TestItem(-1));
q.put(shutdown);
q.put(shutdown);
t1.join();
t2.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment