Skip to content

Instantly share code, notes, and snippets.

@Kuxe
Last active March 19, 2023 17:30
Show Gist options
  • Save Kuxe/6bdd5b748b5f11b303a5cccbb8c8a731 to your computer and use it in GitHub Desktop.
Save Kuxe/6bdd5b748b5f11b303a5cccbb8c8a731 to your computer and use it in GitHub Desktop.
C++ implementation of a general semaphore and a thread-safe circular queue. I recommend refitting the queue class with the standard semaphore available in <semaphore> if you can use C++20.
#ifndef QUEUE_HPP
#define QUEUE_HPP
#include <cstdio>
#include "semaphore.hpp"
/** A thread-safe queue which supports one read-thread and one write-thread
manipulating the queue concurrently. In other words, the thread-safe queue can
be used in the consumer-producer problem with one consumer and one producer. **/
template<typename T>
class Queue {
const size_t size;
T* const arr;
int start = 0, end = 0;
Semaphore write, read;
std::recursive_mutex writemtx, readmtx; //Recursive mutex to prevent a thread to block self
public:
/** Default constructor **/
explicit Queue(const size_t size = 1) : size(size), arr(new T[size]), write(size), read(0) { }
/** Copy constructor. Does not copy ownership or state of mutices from q, so
the copy-constructed Queue should be regarded as a completely new queue
initialized with elements from queue q
NOTE: The copy constructor have NOT been tested so it may or may not introduce deadlocks if used **/
explicit Queue(const Queue& q) : size(q.size), arr(new T[size]), start(q.start), end(q.end), write(q.write), read(q.read) {
//Before copying from q, make sure this owns critical write section of q
std::lock_guard<std::recursive_mutex> lg(q.writemtx);
std::memcpy(arr, q.arr, size * sizeof(T));
}
virtual ~Queue() {
delete[] arr;
}
/** Adds an element to the queue. If queue is full,
block the calling thread until queue is not full
and then add the object, unblocking the calling thread.
If more than one thread calls add or offer concurrently, behaviour is undefined. **/
void add(const T& obj) {
std::lock_guard<std::recursive_mutex> lg(writemtx);
write.acquire();
arr[end++] = obj;
end %= size;
read.release();
}
/** Add an element to the queue if the queue is not full, otherwise
do not add obj to the queue. The calling thread is never blocked.
If succesfully added, returns true, otherwise returns false.
If more than on thread calls add or offer concurrently, behaviour is undefined **/
bool offer(const T& obj) {
std::lock_guard<std::recursive_mutex> lg(writemtx);
if(!full()) {
add(obj);
return true;
}
return false;
}
/** Read first element in queue. If there is no first element
in the queue, blocks the calling thread until there is an
element in the queue which is then returned **/
const T& front() {
std::lock_guard<std::recursive_mutex> lg(readmtx);
read.acquire();
read.release();
return arr[start%size];
}
/** Pops an element from the queue. If there is no element
to pop, block until there is an element to pop and then pop it,
unblocking calling thread.
If more than one thread calls pop concurrently, behaviour is undefined **/
void pop() {
std::lock_guard<std::recursive_mutex> lg(readmtx);
read.acquire();
start++;
start %= size;
write.release();
}
/** Copies first element in queue and erases it from queue.
If there is no element to copy, block until there is an
element to copy and then copy it, unblocking calling thread.
So take() is an atomic front() and pop() operation **/
T take() {
std::lock_guard<std::recursive_mutex> lg(readmtx);
read.acquire();
T retval = arr[start++];
start %= size;
write.release();
return retval;
}
/** Check if queue is empty or not **/
bool empty() const { return !read.available(); }
/** Check if queue is full or not **/
bool full() const { return !write.available(); }
};
#endif //QUEUE_HPP
#ifndef SEMAPHORE_HPP
#define SEMAPHORE_HPP
#include <mutex>
#include <condition_variable>
/** General semaphore with N permissions **/
class Semaphore {
const size_t num_permissions;
size_t avail;
std::mutex m;
std::condition_variable cv;
public:
/** Default constructor. Default semaphore is a binary semaphore **/
explicit Semaphore(const size_t& num_permissions = 1) : num_permissions(num_permissions), avail(num_permissions) { }
/** Copy constructor. Does not copy state of mutex or condition variable,
only the number of permissions and number of available permissions **/
Semaphore(const Semaphore& s) : num_permissions(s.num_permissions), avail(s.avail) { }
void acquire() {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this] { return avail > 0; });
avail--;
lk.unlock();
}
void release() {
m.lock();
avail++;
m.unlock();
cv.notify_one();
}
size_t available() const {
return avail;
}
};
#endif //SEMAPHORE_HPP
@Kuxe
Copy link
Author

Kuxe commented Sep 6, 2022

num_permissions variable is not used

It is used in the copy-constructor of the semaphore, but yes you are right it is never used afterwards. The copy-constructor should probably be deleted since avail does not make sense because the state of mutex is NOT carried over in copy constructor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment