Last active
March 18, 2022 03:18
-
-
Save simonlynen/efba97276c8b09502a20 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef MULTIAGENT_MAPPING_COMMON_THREADSAFE_QUEUE_H_ | |
#define MULTIAGENT_MAPPING_COMMON_THREADSAFE_QUEUE_H_ | |
#include <atomic> | |
#include <memory> | |
#include <pthread.h> | |
#include <queue> | |
#include <string> | |
#include <sys/time.h> | |
#include <glog/logging.h> | |
#define MULTIAGENT_MAPPING_POINTER_TYPEDEFS(TypeName) \ | |
typedef std::shared_ptr<TypeName> Ptr; \ | |
typedef std::shared_ptr<const TypeName> ConstPtr; \ | |
typedef std::unique_ptr<TypeName> UniquePtr; \ | |
void definePointerTypedefs##__FILE__##__LINE__(void) | |
namespace common { | |
class ThreadSafeQueueBase { | |
public: | |
MULTIAGENT_MAPPING_POINTER_TYPEDEFS(ThreadSafeQueueBase); | |
ThreadSafeQueueBase() = default; | |
virtual ~ThreadSafeQueueBase() {} | |
virtual void NotifyAll() const = 0; | |
virtual void Shutdown() = 0; | |
virtual void Resume() = 0; | |
virtual size_t Size() const = 0; | |
virtual bool Empty() const = 0; | |
}; | |
template <typename QueueType> | |
class ThreadSafeQueue : public ThreadSafeQueueBase { | |
friend bool test_funcs(void* (*)(void*), void* (*)(void*), // NOLINT | |
const std::string&, bool); | |
public: | |
MULTIAGENT_MAPPING_POINTER_TYPEDEFS(ThreadSafeQueue); | |
virtual void NotifyAll() const final { | |
pthread_cond_broadcast(&condition_empty_); | |
pthread_cond_broadcast(&condition_full_); | |
} | |
ThreadSafeQueue() { | |
shutdown_ = false; | |
pthread_mutex_init(&mutex_, NULL); | |
pthread_cond_init(&condition_empty_, NULL); | |
pthread_cond_init(&condition_full_, NULL); | |
} | |
virtual ~ThreadSafeQueue() { | |
shutdown_ = true; | |
NotifyAll(); | |
pthread_mutex_destroy(&mutex_); | |
pthread_cond_destroy(&condition_empty_); | |
pthread_cond_destroy(&condition_full_); | |
} | |
virtual void Shutdown() final { | |
shutdown_ = true; | |
NotifyAll(); | |
} | |
virtual void Resume() final { | |
shutdown_ = false; | |
NotifyAll(); | |
} | |
// Push to the queue. | |
void Push(const QueueType& value) { PushNonBlocking(value); } | |
// Push to the queue. | |
void PushNonBlocking(const QueueType& value) { | |
pthread_mutex_lock(&mutex_); | |
queue_.push(value); | |
pthread_cond_signal(&condition_empty_); // Signal that data is available. | |
pthread_mutex_unlock(&mutex_); | |
} | |
virtual size_t Size() const final { | |
pthread_mutex_lock(&mutex_); | |
size_t size = queue_.size(); | |
pthread_mutex_unlock(&mutex_); | |
return size; | |
} | |
virtual bool Empty() const final { | |
pthread_mutex_lock(&mutex_); | |
bool empty = queue_.empty(); | |
pthread_mutex_unlock(&mutex_); | |
return empty; | |
} | |
// Push to the queue if the size is less than max_queue_size, else block. | |
bool PushBlockingIfFull(const QueueType& value, size_t max_queue_size) { | |
while (!shutdown_) { | |
pthread_mutex_lock(&mutex_); | |
size_t size = queue_.size(); | |
if (size >= max_queue_size) { | |
pthread_cond_wait(&condition_full_, &mutex_); | |
} | |
if (size >= max_queue_size) { | |
pthread_mutex_unlock(&mutex_); | |
continue; | |
} | |
queue_.push(value); | |
pthread_cond_signal(&condition_empty_); // Signal that data is available. | |
pthread_mutex_unlock(&mutex_); | |
return true; | |
} | |
return false; | |
} | |
// Returns true if oldest was dropped because queue was full. | |
bool PushNonBlockingDroppingIfFull(const QueueType& value, | |
size_t max_queue_size) { | |
pthread_mutex_lock(&mutex_); | |
bool result = false; | |
if (queue_.size() >= max_queue_size) { | |
queue_.pop(); | |
result = true; | |
} | |
queue_.push(value); | |
pthread_cond_signal(&condition_empty_); // Signal that data is available. | |
pthread_mutex_unlock(&mutex_); | |
return result; | |
} | |
// Pops from the queue blocking if queue is empty. | |
bool Pop(QueueType* value) { return PopBlocking(value); } | |
// Pops from the queue blocking if queue is empty. | |
bool PopBlocking(QueueType* value) { | |
CHECK_NOTNULL(value); | |
while (!shutdown_) { | |
pthread_mutex_lock(&mutex_); | |
if (queue_.empty()) { | |
pthread_cond_wait(&condition_empty_, &mutex_); | |
} | |
if (queue_.empty()) { | |
pthread_mutex_unlock(&mutex_); | |
continue; | |
} | |
QueueType _value = queue_.front(); | |
queue_.pop(); | |
pthread_cond_signal(&condition_full_); // Notify that space is available. | |
pthread_mutex_unlock(&mutex_); | |
*value = _value; | |
return true; | |
} | |
return false; | |
} | |
// Check queue is empty, if yes return false, not altering value. If queue not | |
// empty update value and return true. | |
bool PopNonBlocking(QueueType* value) { | |
CHECK_NOTNULL(value); | |
pthread_mutex_lock(&mutex_); | |
if (queue_.empty()) { | |
pthread_mutex_unlock(&mutex_); | |
return false; | |
} | |
*value = queue_.front(); | |
queue_.pop(); | |
pthread_mutex_unlock(&mutex_); | |
return true; | |
} | |
// Check queue is empty, if yes return false, not altering value. If queue not | |
// empty update value and return true. | |
bool PopTimeout(QueueType* value, int64_t timeout_nanoseconds) { | |
CHECK_NOTNULL(value); | |
pthread_mutex_lock(&mutex_); | |
if (queue_.empty()) { | |
struct timeval tv; | |
struct timespec ts; | |
gettimeofday(&tv, NULL); | |
ts.tv_sec = tv.tv_sec; | |
ts.tv_nsec = tv.tv_usec * 1e3 + timeout_nanoseconds; | |
pthread_cond_timedwait(&condition_empty_, &mutex_, &ts); | |
} | |
if (queue_.empty()) { | |
pthread_mutex_unlock(&mutex_); | |
return false; | |
} | |
QueueType _value = queue_.front(); | |
queue_.pop(); | |
pthread_cond_signal(&condition_full_); // Notify that space is available. | |
pthread_mutex_unlock(&mutex_); | |
*value = _value; | |
return true; | |
} | |
// Get a copy of the front element of the queue. Returns false if empty. | |
bool getCopyOfFront(QueueType* value) { | |
CHECK_NOTNULL(value); | |
pthread_mutex_lock(&mutex_); | |
if (queue_.empty()) { | |
pthread_mutex_unlock(&mutex_); | |
return false; | |
} | |
// COPY the value. | |
*value = queue_.front(); | |
pthread_mutex_unlock(&mutex_); | |
return true; | |
} | |
// Get a copy of the front element of the queue. Returns false if the queue is shut down. | |
bool getCopyOfFrontBlocking(QueueType* value) { | |
CHECK_NOTNULL(value); | |
while (!shutdown_) { | |
pthread_mutex_lock(&mutex_); | |
if (queue_.empty()) { | |
pthread_cond_wait(&condition_empty_, &mutex_); | |
} | |
if (queue_.empty()) { | |
pthread_mutex_unlock(&mutex_); | |
continue; | |
} | |
*value = queue_.front(); | |
pthread_mutex_unlock(&mutex_); | |
return true; | |
} | |
return false; | |
} | |
// Get a copy of the back element of the queue. Returns false if empty. | |
bool getCopyOfBack(QueueType* value) { | |
CHECK_NOTNULL(value); | |
pthread_mutex_lock(&mutex_); | |
if (queue_.empty()) { | |
pthread_mutex_unlock(&mutex_); | |
return false; | |
} | |
// COPY the value. | |
*value = queue_.back(); | |
pthread_mutex_unlock(&mutex_); | |
return true; | |
} | |
mutable pthread_mutex_t mutex_; | |
mutable pthread_cond_t condition_empty_; | |
mutable pthread_cond_t condition_full_; | |
std::queue<QueueType> queue_; | |
std::atomic_bool shutdown_; | |
}; | |
} // namespace common | |
#endif // MULTIAGENT_MAPPING_COMMON_THREADSAFE_QUEUE_H_ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment