Skip to content

Instantly share code, notes, and snippets.

@jrade
Last active May 25, 2024 05:43
Show Gist options
  • Save jrade/730db3050c463250bc2f387319ef801d to your computer and use it in GitHub Desktop.
Save jrade/730db3050c463250bc2f387319ef801d to your computer and use it in GitHub Desktop.
Fast SPSC Queue implemented with C++20 atomics
// Copyright 2024 Johan Rade ([email protected])
// Distributed under the MIT license (https://opensource.org/licenses/MIT)
// Single producer single consumer queue
// Fast and simple implementation using C++20 std::atomic<T>::wait() and std::atomic<T>::notify_one()
#pragma once
#include <atomic>
#include <new>
#include <utility>
#include <vector>
template<typename T>
class alignas(std::hardware_destructive_interference_size) SpscQueue {
public:
SpscQueue(size_t capacity)
: capacity_{ capacity }
, buffer_(capacity)
{
}
size_t capacity() const { return capacity_; }
bool tryPop(T* t)
{
const size_t popCount = popCount_.load(std::memory_order_relaxed);
if (pushCount_.load(std::memory_order_acquire) == popCount)
return false;
*t = std::move(buffer_[popCount % capacity_]);
popCount_.store(popCount + 1, std::memory_order_release);
popCount_.notify_one();
return true;
}
void pop(T* t)
{
const size_t popCount = popCount_.load(std::memory_order_relaxed);
pushCount_.wait(popCount, std::memory_order_acquire);
*t = std::move(buffer_[popCount % capacity_]);
popCount_.store(popCount + 1, std::memory_order_release);
popCount_.notify_one();
}
bool tryPush(const T& t) { return tryPushImpl_(t); }
bool tryPush(T&& t) { return tryPushImpl_(std::move(t)); }
template<typename U>
bool tryPush(U&& u)
{
return tryPushImpl_(std::forward<U>(u));
}
void push(const T& t) { pushImpl_(t); }
void push(T&& t) { pushImpl_(std::move(t)); }
template<typename U>
void push(U&& u)
{
pushImpl_(std::forward<U>(u));
}
private:
template<typename U>
bool tryPushImpl_(U&& u)
{
const size_t pushCount = pushCount_.load(std::memory_order_relaxed);
if (popCount_.load(std::memory_order_acquire) == pushCount - capacity_)
return false;
buffer_[pushCount % capacity_] = std::forward<U>(u);
pushCount_.store(pushCount + 1, std::memory_order_release);
pushCount_.notify_one();
return true;
}
template<typename U>
void pushImpl_(U&& u)
{
const size_t pushCount = pushCount_.load(std::memory_order_relaxed);
popCount_.wait(pushCount - capacity_, std::memory_order_acquire);
buffer_[pushCount % capacity_] = std::forward<U>(u);
pushCount_.store(pushCount + 1, std::memory_order_release);
pushCount_.notify_one();
}
private:
const size_t capacity_;
std::vector<T> buffer_;
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> pushCount_ = 0;
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> popCount_ = 0;
};
//------------------------------------------------------------------------------------------------------------
/*
// A simple test program
#include <iostream>
#include <thread>
const int n = 10000000;
void f(SpscQueue<int>& q, int& s1)
{
int k;
for (int i = 0; i != n; ++i) {
q.pop(&k);
s1 += k;
}
}
int main()
{
SpscQueue<int> q{ 100 };
int s0 = 0;
int s1 = 0;
int k = 1;
clock_t c = clock();
std::thread t{ [&]() {f(q, s1); } };
for (int i = 0; i != n; ++i) {
k = 3 * k + 1;
q.push(k);
s0 += k;
}
t.join();
c = clock() - c;
if (s0 == s1)
std::cout << "Pass\n";
else
std::cout << "Fail\n";
std::cout << (static_cast<float>(n) * CLOCKS_PER_SEC / c) << " elements / s\n";
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment