Skip to content

Instantly share code, notes, and snippets.

@Unbinilium
Last active June 4, 2025 09:18
Show Gist options
  • Save Unbinilium/6e3fe85bd3ed39afd4e212d58964961c to your computer and use it in GitHub Desktop.
Save Unbinilium/6e3fe85bd3ed39afd4e212d58964961c to your computer and use it in GitHub Desktop.
SPSC RingBuffer
#pragma once
#ifndef RING_BUFFER_HPP
#define RING_BUFFER_HPP
#include <atomic>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>
#include <new>
#include <type_traits>
#include <cstdio>
#include <chrono>
namespace core {
#if defined(VERBOSE) || defined(DEBUG) || defined(INFO) || defined(WARNING) || defined(ERROR) || defined(NONE)
#warning "Log level macros may conflict with existing definitions. Please ensure they are unique."
#endif
#define VERBOSE 5
#define DEBUG 4
#define INFO 3
#define WARNING 2
#define ERROR 1
#define NONE 0
#ifndef LOG_LEVEL
#define LOG_LEVEL DEBUG
#elif LOG_LEVEL < NONE || LOG_LEVEL > VERBOSE
#error "LOG_LEVEL must be between NONE (0) and VERBOSE (5)"
#endif
#define LOG(level, messages...) \
do \
{ \
if constexpr (level <= LOG_LEVEL) \
{ \
printf("[%c] ", #level[0]); \
if constexpr (LOG_LEVEL >= DEBUG) \
{ \
printf("[%llu] ", std::chrono::duration_cast<std::chrono::milliseconds>( \
std::chrono::system_clock::now().time_since_epoch()) \
.count()); \
} \
if constexpr (LOG_LEVEL >= VERBOSE) \
{ \
printf("%s:%d ", __FILE__, __LINE__); \
} \
if constexpr (LOG_LEVEL >= DEBUG) \
{ \
printf("%s: ", __FUNCTION__); \
} \
printf(messages); \
printf("\n"); \
} \
} \
while (0)
} // namespace core
namespace core {
namespace core {
template<typename T, std::enable_if_t<std::is_trivially_copyable_v<T> && std::is_trivially_destructible_v<T>, int> = 0>
class RingBuffer final
{
public:
template<typename P = std::unique_ptr<RingBuffer<T>>>
static P create(size_t capacity, void *buffer = nullptr, size_t size = 0)
{
if (capacity < 16)
{
LOG(ERROR, "RingBuffer capacity must be at least 16");
return nullptr;
}
if (size && !buffer)
{
LOG(ERROR, "Buffer cannot be null if size is provided");
return nullptr;
}
if (capacity > std::numeric_limits<size_t>::max() / sizeof(T) - 1)
{
LOG(ERROR, "RingBuffer capacity is too large");
return nullptr;
}
capacity += 1;
const size_t bytes_needed = capacity * sizeof(T);
if (size && size < bytes_needed)
{
LOG(ERROR, "Provided size (%zu) is less than required size (%zu)", size, bytes_needed);
return nullptr;
}
bool internal_buffer = false;
if (!buffer)
{
buffer = reinterpret_cast<T *>(new std::byte[bytes_needed]);
if (!buffer)
{
LOG(ERROR, "Failed to allocate memory for RingBuffer, size: %zu bytes", bytes_needed);
return nullptr;
}
internal_buffer = true;
}
return P(new RingBuffer<T>(internal_buffer, buffer, capacity));
}
~RingBuffer()
{
if (_internal_buffer && _buffer)
{
delete[] reinterpret_cast<std::byte *>(_buffer);
}
_buffer = nullptr;
}
RingBuffer(const RingBuffer &) = delete;
RingBuffer &operator=(const RingBuffer &) = delete;
RingBuffer(RingBuffer &&other) = delete;
RingBuffer &operator=(RingBuffer &&other) = delete;
inline size_t size() const noexcept
{
const size_t head = _head.load(std::memory_order_acquire);
const size_t tail = _tail.load(std::memory_order_relaxed);
if (head >= tail)
{
return head - tail;
}
return _capacity - (tail - head);
}
inline size_t capacity() const noexcept
{
return _capacity - 1;
}
inline bool empty() const noexcept
{
const size_t head = _head.load(std::memory_order_acquire);
const size_t tail = _tail.load(std::memory_order_relaxed);
return head == tail;
}
const T *data() const noexcept
{
return _buffer;
}
inline size_t put(const T &value) noexcept
{
size_t head = _head.load(std::memory_order_relaxed);
size_t tail = _tail.load(std::memory_order_acquire);
const size_t next_head = (head + 1) % _capacity;
size_t next_tail = tail;
const bool overflow = next_head == tail;
T *ptr = _buffer + head;
if (overflow) [[unlikely]]
{
next_tail = (tail + 1) % _capacity;
void *expected_lock = nullptr;
while (1)
{
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire,
std::memory_order_relaxed)) [[likely]]
{
break;
}
}
}
*ptr = value;
_head.compare_exchange_strong(head, next_head, std::memory_order_release, std::memory_order_relaxed);
if (overflow) [[unlikely]]
{
_lock.store(nullptr, std::memory_order_release);
_tail.compare_exchange_strong(tail, next_tail, std::memory_order_release, std::memory_order_relaxed);
}
return 1;
}
inline size_t get(T &value) noexcept
{
const size_t head = _head.load(std::memory_order_acquire);
size_t tail = _tail.load(std::memory_order_relaxed);
if (head == tail) [[unlikely]]
{
return 0;
}
T *ptr = _buffer + tail;
void *expected_lock = nullptr;
while (1)
{
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire, std::memory_order_relaxed))
[[likely]]
{
break;
}
}
value = *ptr;
_lock.store(nullptr, std::memory_order_release);
const size_t next_tail = (tail + 1) % _capacity;
_tail.compare_exchange_strong(tail, next_tail, std::memory_order_release, std::memory_order_relaxed);
_pos = 0;
return 1;
}
inline size_t write(const T *buffer, size_t count) noexcept
{
if (!buffer || !count) [[unlikely]]
{
return 0;
}
const size_t rb_capacity = capacity();
if (count > rb_capacity) [[unlikely]]
{
buffer += count - rb_capacity;
count = rb_capacity;
}
size_t head = _head.load(std::memory_order_relaxed);
size_t tail = _tail.load(std::memory_order_acquire);
size_t available = head >= tail ? rb_capacity - head + tail : tail - head - 1;
const size_t safe_count = std::min(count, available);
const size_t part_1_start = head;
const size_t part_1_end = std::min(part_1_start + safe_count, rb_capacity);
const size_t part_1_count = part_1_end - part_1_start;
if (part_1_count)
{
std::memcpy(_buffer + part_1_start, buffer, part_1_count * sizeof(T));
}
const size_t part_2_count = safe_count - part_1_count;
if (part_2_count)
{
std::memcpy(_buffer, buffer + part_1_count, part_2_count * sizeof(T));
}
size_t written = safe_count;
const size_t next_head = (head + safe_count) % _capacity;
_head.compare_exchange_strong(head, next_head, std::memory_order_release, std::memory_order_relaxed);
while (written < count)
{
written += put(*(buffer + written));
}
return written;
}
inline size_t read(T *buffer, size_t count) noexcept
{
const size_t head = _head.load(std::memory_order_acquire);
const size_t tail = _tail.load(std::memory_order_relaxed);
if (head == tail || !count) [[unlikely]]
{
return 0;
}
size_t copied = 0;
size_t next_tail = tail;
if (buffer)
{
while (head != next_tail && copied < count)
{
T *ptr = _buffer + next_tail;
void *expected_lock = nullptr;
while (1)
{
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire,
std::memory_order_relaxed)) [[likely]]
{
break;
}
}
buffer[copied++] = *ptr;
_lock.store(nullptr, std::memory_order_release);
next_tail = (next_tail + 1) % _capacity;
}
}
else
{
while (head != next_tail && copied < count)
{
++copied;
next_tail = (next_tail + 1) % _capacity;
}
}
size_t expected = tail;
while (expected != next_tail)
{
if (_tail.compare_exchange_weak(expected, next_tail, std::memory_order_release, std::memory_order_relaxed))
{
break;
}
expected = (expected + 1) % _capacity;
}
_pos = 0;
return copied;
}
inline size_t tellg() const noexcept
{
return _pos;
}
inline bool seekg(size_t pos) const noexcept
{
if (pos >= size()) [[unlikely]]
{
return false;
}
_pos = pos;
return true;
}
inline bool peek(T &value) const noexcept
{
const size_t head = _head.load(std::memory_order_acquire);
const size_t tail = _tail.load(std::memory_order_relaxed);
const size_t size = head >= tail ? head - tail : _capacity - tail + head;
if (_pos >= size) [[unlikely]]
{
return false;
}
const size_t pos = (tail + _pos++) % _capacity;
T *ptr = _buffer + pos;
void *expected_lock = nullptr;
while (1)
{
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire, std::memory_order_relaxed))
[[likely]]
{
break;
}
}
value = *ptr;
_lock.store(nullptr, std::memory_order_release);
return true;
}
void clear() noexcept
{
_head.store(0, std::memory_order_release);
_tail.store(0, std::memory_order_release);
_pos = 0;
}
protected:
RingBuffer(bool internal_buffer, void *buffer, size_t capacity)
: _internal_buffer(internal_buffer), _buffer(static_cast<T *>(buffer)), _capacity(capacity), _head(0), _tail(0),
_lock(nullptr), _pos(0)
{
#ifdef __cpp_lib_atomic_is_always_lock_free
if (!_head.is_always_lock_free || !_tail.is_always_lock_free)
{
LOG(WARNING, "RingBuffer head or tail is not lock-free, performance may be affected");
}
if (!_lock.is_always_lock_free)
{
LOG(WARNING, "RingBuffer lock is not lock-free, performance may be affected");
}
#endif
}
private:
const bool _internal_buffer;
T *_buffer;
const size_t _capacity;
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> _head;
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> _tail;
alignas(std::hardware_destructive_interference_size) std::atomic<void *> mutable _lock;
alignas(std::hardware_destructive_interference_size) mutable size_t _pos;
};
} // namespace core
#endif // RING_BUFFER_HPP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment