Last active
June 4, 2025 09:18
-
-
Save Unbinilium/6e3fe85bd3ed39afd4e212d58964961c to your computer and use it in GitHub Desktop.
SPSC RingBuffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#ifndef RING_BUFFER_HPP | |
#define RING_BUFFER_HPP | |
#include <atomic> | |
#include <cmath> | |
#include <cstddef> | |
#include <cstdint> | |
#include <cstring> | |
#include <limits> | |
#include <memory> | |
#include <new> | |
#include <type_traits> | |
#include <cstdio> | |
#include <chrono> | |
namespace core { | |
#if defined(VERBOSE) || defined(DEBUG) || defined(INFO) || defined(WARNING) || defined(ERROR) || defined(NONE) | |
#warning "Log level macros may conflict with existing definitions. Please ensure they are unique." | |
#endif | |
#define VERBOSE 5 | |
#define DEBUG 4 | |
#define INFO 3 | |
#define WARNING 2 | |
#define ERROR 1 | |
#define NONE 0 | |
#ifndef LOG_LEVEL | |
#define LOG_LEVEL DEBUG | |
#elif LOG_LEVEL < NONE || LOG_LEVEL > VERBOSE | |
#error "LOG_LEVEL must be between NONE (0) and VERBOSE (5)" | |
#endif | |
#define LOG(level, messages...) \ | |
do \ | |
{ \ | |
if constexpr (level <= LOG_LEVEL) \ | |
{ \ | |
printf("[%c] ", #level[0]); \ | |
if constexpr (LOG_LEVEL >= DEBUG) \ | |
{ \ | |
printf("[%llu] ", std::chrono::duration_cast<std::chrono::milliseconds>( \ | |
std::chrono::system_clock::now().time_since_epoch()) \ | |
.count()); \ | |
} \ | |
if constexpr (LOG_LEVEL >= VERBOSE) \ | |
{ \ | |
printf("%s:%d ", __FILE__, __LINE__); \ | |
} \ | |
if constexpr (LOG_LEVEL >= DEBUG) \ | |
{ \ | |
printf("%s: ", __FUNCTION__); \ | |
} \ | |
printf(messages); \ | |
printf("\n"); \ | |
} \ | |
} \ | |
while (0) | |
} // namespace core | |
namespace core { | |
namespace core { | |
template<typename T, std::enable_if_t<std::is_trivially_copyable_v<T> && std::is_trivially_destructible_v<T>, int> = 0> | |
class RingBuffer final | |
{ | |
public: | |
template<typename P = std::unique_ptr<RingBuffer<T>>> | |
static P create(size_t capacity, void *buffer = nullptr, size_t size = 0) | |
{ | |
if (capacity < 16) | |
{ | |
LOG(ERROR, "RingBuffer capacity must be at least 16"); | |
return nullptr; | |
} | |
if (size && !buffer) | |
{ | |
LOG(ERROR, "Buffer cannot be null if size is provided"); | |
return nullptr; | |
} | |
if (capacity > std::numeric_limits<size_t>::max() / sizeof(T) - 1) | |
{ | |
LOG(ERROR, "RingBuffer capacity is too large"); | |
return nullptr; | |
} | |
capacity += 1; | |
const size_t bytes_needed = capacity * sizeof(T); | |
if (size && size < bytes_needed) | |
{ | |
LOG(ERROR, "Provided size (%zu) is less than required size (%zu)", size, bytes_needed); | |
return nullptr; | |
} | |
bool internal_buffer = false; | |
if (!buffer) | |
{ | |
buffer = reinterpret_cast<T *>(new std::byte[bytes_needed]); | |
if (!buffer) | |
{ | |
LOG(ERROR, "Failed to allocate memory for RingBuffer, size: %zu bytes", bytes_needed); | |
return nullptr; | |
} | |
internal_buffer = true; | |
} | |
return P(new RingBuffer<T>(internal_buffer, buffer, capacity)); | |
} | |
~RingBuffer() | |
{ | |
if (_internal_buffer && _buffer) | |
{ | |
delete[] reinterpret_cast<std::byte *>(_buffer); | |
} | |
_buffer = nullptr; | |
} | |
RingBuffer(const RingBuffer &) = delete; | |
RingBuffer &operator=(const RingBuffer &) = delete; | |
RingBuffer(RingBuffer &&other) = delete; | |
RingBuffer &operator=(RingBuffer &&other) = delete; | |
inline size_t size() const noexcept | |
{ | |
const size_t head = _head.load(std::memory_order_acquire); | |
const size_t tail = _tail.load(std::memory_order_relaxed); | |
if (head >= tail) | |
{ | |
return head - tail; | |
} | |
return _capacity - (tail - head); | |
} | |
inline size_t capacity() const noexcept | |
{ | |
return _capacity - 1; | |
} | |
inline bool empty() const noexcept | |
{ | |
const size_t head = _head.load(std::memory_order_acquire); | |
const size_t tail = _tail.load(std::memory_order_relaxed); | |
return head == tail; | |
} | |
const T *data() const noexcept | |
{ | |
return _buffer; | |
} | |
inline size_t put(const T &value) noexcept | |
{ | |
size_t head = _head.load(std::memory_order_relaxed); | |
size_t tail = _tail.load(std::memory_order_acquire); | |
const size_t next_head = (head + 1) % _capacity; | |
size_t next_tail = tail; | |
const bool overflow = next_head == tail; | |
T *ptr = _buffer + head; | |
if (overflow) [[unlikely]] | |
{ | |
next_tail = (tail + 1) % _capacity; | |
void *expected_lock = nullptr; | |
while (1) | |
{ | |
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire, | |
std::memory_order_relaxed)) [[likely]] | |
{ | |
break; | |
} | |
} | |
} | |
*ptr = value; | |
_head.compare_exchange_strong(head, next_head, std::memory_order_release, std::memory_order_relaxed); | |
if (overflow) [[unlikely]] | |
{ | |
_lock.store(nullptr, std::memory_order_release); | |
_tail.compare_exchange_strong(tail, next_tail, std::memory_order_release, std::memory_order_relaxed); | |
} | |
return 1; | |
} | |
inline size_t get(T &value) noexcept | |
{ | |
const size_t head = _head.load(std::memory_order_acquire); | |
size_t tail = _tail.load(std::memory_order_relaxed); | |
if (head == tail) [[unlikely]] | |
{ | |
return 0; | |
} | |
T *ptr = _buffer + tail; | |
void *expected_lock = nullptr; | |
while (1) | |
{ | |
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire, std::memory_order_relaxed)) | |
[[likely]] | |
{ | |
break; | |
} | |
} | |
value = *ptr; | |
_lock.store(nullptr, std::memory_order_release); | |
const size_t next_tail = (tail + 1) % _capacity; | |
_tail.compare_exchange_strong(tail, next_tail, std::memory_order_release, std::memory_order_relaxed); | |
_pos = 0; | |
return 1; | |
} | |
inline size_t write(const T *buffer, size_t count) noexcept | |
{ | |
if (!buffer || !count) [[unlikely]] | |
{ | |
return 0; | |
} | |
const size_t rb_capacity = capacity(); | |
if (count > rb_capacity) [[unlikely]] | |
{ | |
buffer += count - rb_capacity; | |
count = rb_capacity; | |
} | |
size_t head = _head.load(std::memory_order_relaxed); | |
size_t tail = _tail.load(std::memory_order_acquire); | |
size_t available = head >= tail ? rb_capacity - head + tail : tail - head - 1; | |
const size_t safe_count = std::min(count, available); | |
const size_t part_1_start = head; | |
const size_t part_1_end = std::min(part_1_start + safe_count, rb_capacity); | |
const size_t part_1_count = part_1_end - part_1_start; | |
if (part_1_count) | |
{ | |
std::memcpy(_buffer + part_1_start, buffer, part_1_count * sizeof(T)); | |
} | |
const size_t part_2_count = safe_count - part_1_count; | |
if (part_2_count) | |
{ | |
std::memcpy(_buffer, buffer + part_1_count, part_2_count * sizeof(T)); | |
} | |
size_t written = safe_count; | |
const size_t next_head = (head + safe_count) % _capacity; | |
_head.compare_exchange_strong(head, next_head, std::memory_order_release, std::memory_order_relaxed); | |
while (written < count) | |
{ | |
written += put(*(buffer + written)); | |
} | |
return written; | |
} | |
inline size_t read(T *buffer, size_t count) noexcept | |
{ | |
const size_t head = _head.load(std::memory_order_acquire); | |
const size_t tail = _tail.load(std::memory_order_relaxed); | |
if (head == tail || !count) [[unlikely]] | |
{ | |
return 0; | |
} | |
size_t copied = 0; | |
size_t next_tail = tail; | |
if (buffer) | |
{ | |
while (head != next_tail && copied < count) | |
{ | |
T *ptr = _buffer + next_tail; | |
void *expected_lock = nullptr; | |
while (1) | |
{ | |
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire, | |
std::memory_order_relaxed)) [[likely]] | |
{ | |
break; | |
} | |
} | |
buffer[copied++] = *ptr; | |
_lock.store(nullptr, std::memory_order_release); | |
next_tail = (next_tail + 1) % _capacity; | |
} | |
} | |
else | |
{ | |
while (head != next_tail && copied < count) | |
{ | |
++copied; | |
next_tail = (next_tail + 1) % _capacity; | |
} | |
} | |
size_t expected = tail; | |
while (expected != next_tail) | |
{ | |
if (_tail.compare_exchange_weak(expected, next_tail, std::memory_order_release, std::memory_order_relaxed)) | |
{ | |
break; | |
} | |
expected = (expected + 1) % _capacity; | |
} | |
_pos = 0; | |
return copied; | |
} | |
inline size_t tellg() const noexcept | |
{ | |
return _pos; | |
} | |
inline bool seekg(size_t pos) const noexcept | |
{ | |
if (pos >= size()) [[unlikely]] | |
{ | |
return false; | |
} | |
_pos = pos; | |
return true; | |
} | |
inline bool peek(T &value) const noexcept | |
{ | |
const size_t head = _head.load(std::memory_order_acquire); | |
const size_t tail = _tail.load(std::memory_order_relaxed); | |
const size_t size = head >= tail ? head - tail : _capacity - tail + head; | |
if (_pos >= size) [[unlikely]] | |
{ | |
return false; | |
} | |
const size_t pos = (tail + _pos++) % _capacity; | |
T *ptr = _buffer + pos; | |
void *expected_lock = nullptr; | |
while (1) | |
{ | |
if (_lock.compare_exchange_weak(expected_lock, ptr, std::memory_order_acquire, std::memory_order_relaxed)) | |
[[likely]] | |
{ | |
break; | |
} | |
} | |
value = *ptr; | |
_lock.store(nullptr, std::memory_order_release); | |
return true; | |
} | |
void clear() noexcept | |
{ | |
_head.store(0, std::memory_order_release); | |
_tail.store(0, std::memory_order_release); | |
_pos = 0; | |
} | |
protected: | |
RingBuffer(bool internal_buffer, void *buffer, size_t capacity) | |
: _internal_buffer(internal_buffer), _buffer(static_cast<T *>(buffer)), _capacity(capacity), _head(0), _tail(0), | |
_lock(nullptr), _pos(0) | |
{ | |
#ifdef __cpp_lib_atomic_is_always_lock_free | |
if (!_head.is_always_lock_free || !_tail.is_always_lock_free) | |
{ | |
LOG(WARNING, "RingBuffer head or tail is not lock-free, performance may be affected"); | |
} | |
if (!_lock.is_always_lock_free) | |
{ | |
LOG(WARNING, "RingBuffer lock is not lock-free, performance may be affected"); | |
} | |
#endif | |
} | |
private: | |
const bool _internal_buffer; | |
T *_buffer; | |
const size_t _capacity; | |
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> _head; | |
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> _tail; | |
alignas(std::hardware_destructive_interference_size) std::atomic<void *> mutable _lock; | |
alignas(std::hardware_destructive_interference_size) mutable size_t _pos; | |
}; | |
} // namespace core | |
#endif // RING_BUFFER_HPP |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment