Last active
May 25, 2024 05:43
-
-
Save jrade/730db3050c463250bc2f387319ef801d to your computer and use it in GitHub Desktop.
Fast SPSC Queue implemented with C++20 atomics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2024 Johan Rade ([email protected]) | |
// Distributed under the MIT license (https://opensource.org/licenses/MIT) | |
// Single producer single consumer queue | |
// Fast and simple implementation using C++20 std::atomic<T>::wait() and std::atomic<T>::notify_one() | |
#pragma once | |
#include <atomic> | |
#include <new> | |
#include <utility> | |
#include <vector> | |
template<typename T> | |
class alignas(std::hardware_destructive_interference_size) SpscQueue { | |
public: | |
SpscQueue(size_t capacity) | |
: capacity_{ capacity } | |
, buffer_(capacity) | |
{ | |
} | |
size_t capacity() const { return capacity_; } | |
bool tryPop(T* t) | |
{ | |
const size_t popCount = popCount_.load(std::memory_order_relaxed); | |
if (pushCount_.load(std::memory_order_acquire) == popCount) | |
return false; | |
*t = std::move(buffer_[popCount % capacity_]); | |
popCount_.store(popCount + 1, std::memory_order_release); | |
popCount_.notify_one(); | |
return true; | |
} | |
void pop(T* t) | |
{ | |
const size_t popCount = popCount_.load(std::memory_order_relaxed); | |
pushCount_.wait(popCount, std::memory_order_acquire); | |
*t = std::move(buffer_[popCount % capacity_]); | |
popCount_.store(popCount + 1, std::memory_order_release); | |
popCount_.notify_one(); | |
} | |
bool tryPush(const T& t) { return tryPushImpl_(t); } | |
bool tryPush(T&& t) { return tryPushImpl_(std::move(t)); } | |
template<typename U> | |
bool tryPush(U&& u) | |
{ | |
return tryPushImpl_(std::forward<U>(u)); | |
} | |
void push(const T& t) { pushImpl_(t); } | |
void push(T&& t) { pushImpl_(std::move(t)); } | |
template<typename U> | |
void push(U&& u) | |
{ | |
pushImpl_(std::forward<U>(u)); | |
} | |
private: | |
template<typename U> | |
bool tryPushImpl_(U&& u) | |
{ | |
const size_t pushCount = pushCount_.load(std::memory_order_relaxed); | |
if (popCount_.load(std::memory_order_acquire) == pushCount - capacity_) | |
return false; | |
buffer_[pushCount % capacity_] = std::forward<U>(u); | |
pushCount_.store(pushCount + 1, std::memory_order_release); | |
pushCount_.notify_one(); | |
return true; | |
} | |
template<typename U> | |
void pushImpl_(U&& u) | |
{ | |
const size_t pushCount = pushCount_.load(std::memory_order_relaxed); | |
popCount_.wait(pushCount - capacity_, std::memory_order_acquire); | |
buffer_[pushCount % capacity_] = std::forward<U>(u); | |
pushCount_.store(pushCount + 1, std::memory_order_release); | |
pushCount_.notify_one(); | |
} | |
private: | |
const size_t capacity_; | |
std::vector<T> buffer_; | |
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> pushCount_ = 0; | |
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> popCount_ = 0; | |
}; | |
//------------------------------------------------------------------------------------------------------------ | |
/* | |
// A simple test program | |
#include <iostream> | |
#include <thread> | |
const int n = 10000000; | |
void f(SpscQueue<int>& q, int& s1) | |
{ | |
int k; | |
for (int i = 0; i != n; ++i) { | |
q.pop(&k); | |
s1 += k; | |
} | |
} | |
int main() | |
{ | |
SpscQueue<int> q{ 100 }; | |
int s0 = 0; | |
int s1 = 0; | |
int k = 1; | |
clock_t c = clock(); | |
std::thread t{ [&]() {f(q, s1); } }; | |
for (int i = 0; i != n; ++i) { | |
k = 3 * k + 1; | |
q.push(k); | |
s0 += k; | |
} | |
t.join(); | |
c = clock() - c; | |
if (s0 == s1) | |
std::cout << "Pass\n"; | |
else | |
std::cout << "Fail\n"; | |
std::cout << (static_cast<float>(n) * CLOCKS_PER_SEC / c) << " elements / s\n"; | |
} | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment