Skip to content

Instantly share code, notes, and snippets.

@jacky860226
Created October 7, 2025 16:52
Show Gist options
  • Save jacky860226/c8edb8eb24d2afb540186ff0bc02cb91 to your computer and use it in GitHub Desktop.
Save jacky860226/c8edb8eb24d2afb540186ff0bc02cb91 to your computer and use it in GitHub Desktop.
Concurrent Queue
#pragma once
#include <algorithm>
#include <atomic>
#include <cassert>
#include <cstddef>
#include <iterator>
#include <memory>
#include <thread>
// 線程安全的無鎖環形隊列,支持動態擴容
// 核心特性:雙指針系統 + RAII守護類 + 原子操作 + 版本控制
template <typename T> class concurrent_queue {
private:
// 雙指針系統:分離預約和提交操作,實現無鎖並發
std::atomic<size_t> head_start, head_end; // 出隊:[start,end)為正在處理區間
std::atomic<size_t> tail_start, tail_end; // 入隊:[start,end)為正在處理區間
std::atomic<size_t> num_thread_assigning; // 當前活躍線程數(用於擴容同步)
std::unique_ptr<T[]> buffer; // 環形緩衝區
std::atomic<size_t> capacity; // 當前容量(2的冪)
std::atomic<bool> being_expanded; // 擴容互斥鎖(CAS實現)
std::atomic<size_t> expansion_version; // 擴容版本號(檢測並發擴容)
inline static constexpr size_t INITIAL_CAPACITY = 512;
// RAII擴容鎖守護:確保同一時間只有一個線程執行擴容操作
class expansion_guard {
std::atomic<bool> &flag;
bool acquired;
public:
explicit expansion_guard(std::atomic<bool> &f) : flag(f), acquired(false) {
// 使用CAS原子操作嘗試獲取擴容鎖(false->true)
bool expected = false;
acquired = flag.compare_exchange_strong(expected, true,
std::memory_order_acq_rel);
// 只有一個線程能成功,其他線程的acquired將為false
}
~expansion_guard() {
// 自動釋放擴容鎖(如果成功獲取了的話)
if (acquired) {
flag.store(false, std::memory_order_release);
}
}
bool is_acquired() const { return acquired; }
expansion_guard(const expansion_guard &) = delete;
expansion_guard &operator=(const expansion_guard &) = delete;
expansion_guard(expansion_guard &&) = delete;
expansion_guard &operator=(expansion_guard &&) = delete;
};
// RAII入隊預約守護:管理tail指針的兩階段提交
class push_reservation_guard {
concurrent_queue *queue_ptr;
size_t reserved_start;
size_t reserved_size;
bool committed;
public:
push_reservation_guard(concurrent_queue *queue, size_t size)
: queue_ptr(queue), reserved_size(size), committed(false) {
// 預約階段:原子性預約tail空間
while (true) {
// 步驟1:版本號一致性檢查,防止讀取過程中發生擴容
size_t version_before = queue->get_expansion_version();
size_t current_tail =
queue_ptr->tail_end.load(std::memory_order_acquire);
size_t current_capacity =
queue->capacity.load(std::memory_order_acquire);
size_t current_head = queue->head_end.load(std::memory_order_acquire);
size_t version_after = queue->get_expansion_version();
if (version_before != version_after) {
std::this_thread::yield();
continue;
}
// 步驟2:狀態一致性檢查
if (current_tail < current_head) {
std::this_thread::yield();
continue;
}
size_t current_size = current_tail - current_head;
// 步驟3:容量檢查,如需要則觸發擴容
if (current_size + size > current_capacity) {
queue->try_expand(current_size + size);
std::this_thread::yield();
continue;
}
// 步驟4:註冊為活躍線程,防止擴容開始
queue_ptr->num_thread_assigning.fetch_add(1, std::memory_order_acq_rel);
if (queue_ptr->being_expanded.load(std::memory_order_acquire)) {
queue_ptr->num_thread_assigning.fetch_sub(1,
std::memory_order_acq_rel);
std::this_thread::yield();
continue;
}
// 步驟5:CAS預約tail空間
size_t expected_tail = current_tail;
size_t new_tail = current_tail + size;
if (queue_ptr->tail_end.compare_exchange_weak(
expected_tail, new_tail, std::memory_order_acq_rel)) {
// 預約成功後再次檢查擴容(避免TOCTOU問題)
if (queue_ptr->being_expanded.load(std::memory_order_acquire)) {
// 擴容開始了,回滾tail_end並等待擴容完成
size_t rollback_expected = new_tail;
queue_ptr->tail_end.compare_exchange_strong(
rollback_expected, expected_tail, std::memory_order_acq_rel);
queue_ptr->num_thread_assigning.fetch_sub(
1, std::memory_order_acq_rel);
std::this_thread::yield();
continue;
}
reserved_start = expected_tail;
break;
}
queue_ptr->num_thread_assigning.fetch_sub(1, std::memory_order_acq_rel);
std::this_thread::yield();
}
}
~push_reservation_guard() {
// 析構時自動提交,確保兩階段提交完整性
commit();
queue_ptr->num_thread_assigning.fetch_sub(1, std::memory_order_acq_rel);
}
size_t get_start() const { return reserved_start; }
size_t get_size() const { return reserved_size; }
void commit() {
if (committed)
return;
// 提交階段:等待輪到自己提交(按預約順序)
while (true) {
size_t expected_start = reserved_start;
if (queue_ptr->tail_start.compare_exchange_weak(
expected_start, reserved_start + reserved_size,
std::memory_order_acq_rel)) {
committed = true;
break;
}
std::this_thread::yield();
}
}
push_reservation_guard(const push_reservation_guard &) = delete;
push_reservation_guard &operator=(const push_reservation_guard &) = delete;
push_reservation_guard(push_reservation_guard &&) = delete;
push_reservation_guard &operator=(push_reservation_guard &&) = delete;
};
// RAII出隊預約守護:管理head指針的兩階段提交
class pop_reservation_guard {
concurrent_queue *queue_ptr;
size_t reserved_start;
size_t reserved_size;
bool committed;
public:
pop_reservation_guard(concurrent_queue *queue, size_t requested_size)
: queue_ptr(queue), committed(false) {
// 註冊為活躍線程(與push_reservation_guard相同邏輯)
while (true) {
if (queue_ptr->being_expanded.load(std::memory_order_acquire)) {
std::this_thread::yield();
continue;
}
queue_ptr->num_thread_assigning.fetch_add(1, std::memory_order_acq_rel);
if (queue_ptr->being_expanded.load(std::memory_order_acquire)) {
queue_ptr->num_thread_assigning.fetch_sub(1,
std::memory_order_acq_rel);
std::this_thread::yield();
continue;
}
break;
}
// 預約head空間用於出隊
size_t current_head = queue_ptr->head_end.load(std::memory_order_acquire);
while (true) {
size_t current_tail =
queue_ptr->tail_end.load(std::memory_order_acquire);
size_t expected_head = current_head;
if (current_tail < current_head) {
std::this_thread::yield();
continue;
}
size_t current_size = current_tail - current_head;
// 實際預約大小 = min(請求大小, 可用數據量)
size_t actual_size = std::min(requested_size, current_size);
size_t new_head = current_head + actual_size;
// CAS預約head空間
if (queue_ptr->head_end.compare_exchange_weak(
expected_head, new_head, std::memory_order_acq_rel)) {
reserved_start = expected_head;
reserved_size = actual_size;
break;
}
current_head = expected_head;
}
}
~pop_reservation_guard() {
commit();
queue_ptr->num_thread_assigning.fetch_sub(1, std::memory_order_acq_rel);
}
size_t get_start() const { return reserved_start; }
size_t get_size() const { return reserved_size; }
void commit() {
if (committed)
return;
if (reserved_size == 0) {
committed = true;
return;
}
// 提交階段:等待輪到自己提交(按預約順序)
while (true) {
size_t expected_start = reserved_start;
if (queue_ptr->head_start.compare_exchange_weak(
expected_start, reserved_start + reserved_size,
std::memory_order_acq_rel)) {
committed = true;
break;
}
std::this_thread::yield();
}
}
pop_reservation_guard(const pop_reservation_guard &) = delete;
pop_reservation_guard &operator=(const pop_reservation_guard &) = delete;
pop_reservation_guard(pop_reservation_guard &&) = delete;
pop_reservation_guard &operator=(pop_reservation_guard &&) = delete;
};
// 初始化所有原子變量
void initialize_atomics(size_t tail_value = 0) {
head_start.store(0, std::memory_order_relaxed);
head_end.store(0, std::memory_order_relaxed);
tail_start.store(tail_value, std::memory_order_relaxed);
tail_end.store(tail_value, std::memory_order_relaxed);
num_thread_assigning.store(0, std::memory_order_relaxed);
being_expanded.store(false, std::memory_order_relaxed);
expansion_version.store(0, std::memory_order_relaxed);
}
// 設置容量為2的冪
void setup_capacity(size_t required_capacity) {
size_t cap = 1;
while (cap < required_capacity) {
cap <<= 1;
}
capacity.store(cap, std::memory_order_relaxed);
buffer = std::make_unique<T[]>(cap);
}
// 等待擴容完成並獲取當前版本號
size_t get_expansion_version() const {
while (being_expanded.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
return expansion_version.load(std::memory_order_acquire);
}
// 擴容函數:同一時間只有一個線程能執行
bool try_expand(size_t required_capacity) {
// 步驟1:嘗試獲取擴容鎖(CAS操作),確保只有一個線程執行擴容
expansion_guard expand_guard(being_expanded);
if (!expand_guard.is_acquired()) {
return false; // 其他線程正在擴容
}
// 步驟2:雙重檢查容量需求
const size_t old_capacity = capacity.load(std::memory_order_acquire);
if (required_capacity <= old_capacity) {
return false; // 可能其他線程已完成擴容
}
// 步驟3:等待所有活躍線程完成操作,確保狀態一致
size_t head_start_val = 0;
size_t head_end_val = 0;
size_t tail_start_val = 0;
size_t tail_end_val = 0;
while (true) {
// 等待所有線程完成當前操作
while (num_thread_assigning.load(std::memory_order_acquire) > 0) {
std::this_thread::yield();
}
// 讀取所有指針狀態
head_start_val = head_start.load(std::memory_order_acquire);
head_end_val = head_end.load(std::memory_order_acquire);
tail_start_val = tail_start.load(std::memory_order_acquire);
tail_end_val = tail_end.load(std::memory_order_acquire);
// 檢查狀態一致性:所有操作都應該已完成
if (head_start_val != head_end_val) {
continue; // 還有出隊操作未完成
}
if (tail_start_val != tail_end_val) {
continue; // 還有入隊操作未完成
}
if (tail_start_val < head_start_val) {
continue; // 狀態不一致
}
break; // 狀態一致,可以安全擴容
}
// 步驟4:計算擴容參數
const size_t current_head = head_start_val;
const size_t current_tail = tail_start_val;
const size_t current_size = current_tail - current_head;
size_t new_capacity = old_capacity;
while (new_capacity < required_capacity) {
new_capacity <<= 1; // 擴容到2的冪
}
const size_t mask = old_capacity - 1;
// 步驟5:分配新緩衝區並複製數據
T *old_buffer = buffer.get();
auto new_buffer = std::make_unique<T[]>(new_capacity);
T *new_buf = new_buffer.get();
const size_t start_idx = current_head & mask;
// 處理環形緩衝區的數據複製
if (start_idx + current_size <= old_capacity) {
// 數據連續,直接複製
for (size_t i = 0; i < current_size; ++i) {
new_buf[i] = std::move(old_buffer[start_idx + i]);
}
} else {
// 數據跨越邊界,分兩段複製
const size_t first_part = old_capacity - start_idx;
const size_t second_part = current_size - first_part;
for (size_t i = 0; i < first_part; ++i) {
new_buf[i] = std::move(old_buffer[start_idx + i]);
}
for (size_t i = 0; i < second_part; ++i) {
new_buf[first_part + i] = std::move(old_buffer[i]);
}
}
// 步驟6:原子性更新所有狀態
buffer = std::move(new_buffer);
capacity.store(new_capacity, std::memory_order_release);
head_start.store(0, std::memory_order_release);
head_end.store(0, std::memory_order_release);
tail_start.store(current_size, std::memory_order_release);
tail_end.store(current_size, std::memory_order_release);
// 步驟7:增加版本號,通知其他線程擴容完成
expansion_version.fetch_add(1, std::memory_order_release);
return true; // expand_guard析構時自動釋放擴容鎖
}
public:
// 構造函數:創建指定初始容量的空隊列
explicit concurrent_queue(size_t initial_capacity = INITIAL_CAPACITY) {
setup_capacity(initial_capacity);
initialize_atomics();
}
// 構造函數:從迭代器範圍創建隊列
template <typename Iterator> concurrent_queue(Iterator begin, Iterator end) {
size_t item_count = std::distance(begin, end);
setup_capacity(item_count);
std::copy(begin, end, buffer.get());
initialize_atomics(item_count);
}
~concurrent_queue() = default;
// 禁止拷貝和移動,確保唯一性
concurrent_queue(const concurrent_queue &) = delete;
concurrent_queue &operator=(const concurrent_queue &) = delete;
concurrent_queue(concurrent_queue &&) = delete;
concurrent_queue &operator=(concurrent_queue &&) = delete;
// 批量入隊操作:線程安全的尾部插入
template <typename Iterator> void push_back(Iterator start, Iterator end) {
if (start == end)
return;
const size_t items_size = std::distance(start, end);
// 使用RAII守護類進行兩階段提交
push_reservation_guard reservation(this, items_size);
size_t reserved_start = reservation.get_start();
// 等待 pop_front 完成資料轉移,確保不會覆蓋未讀數據
size_t current_capacity = capacity.load(std::memory_order_acquire);
while (reserved_start + items_size -
head_start.load(std::memory_order_acquire) >
current_capacity) {
std::this_thread::yield();
}
// 將數據寫入預約的環形緩衝區空間
T *buf = buffer.get();
const size_t mask = current_capacity - 1;
size_t i = 0;
for (auto it = start; it != end; ++it, ++i) {
size_t idx = (reserved_start + i) & mask; // 環形索引
buf[idx] = *it;
}
// reservation析構時自動提交
}
// 容器入隊的便利方法
template <typename Container> void push_back(const Container &items) {
push_back(std::begin(items), std::end(items));
}
// 批量出隊操作:線程安全的頭部移除
template <typename OutputIterator>
size_t pop_front(size_t num, OutputIterator out) {
if (num == 0)
return 0;
// 使用RAII守護類進行兩階段提交
pop_reservation_guard reservation(this, num);
size_t reserved_start = reservation.get_start();
size_t actual_num = reservation.get_size();
// 等待數據寫入完成(tail_start推進到預約範圍)
while (tail_start.load(std::memory_order_acquire) <
reserved_start + actual_num) {
std::this_thread::yield();
}
// 從環形緩衝區讀取數據
size_t current_capacity = capacity.load(std::memory_order_acquire);
T *buf = buffer.get();
const size_t mask = current_capacity - 1;
for (size_t i = 0; i < actual_num; ++i) {
size_t idx = (reserved_start + i) & mask; // 環形索引
*out++ = std::move(buf[idx]);
}
return actual_num; // 返回實際出隊的元素數量
}
// 獲取隊列當前活躍範圍大小(包含正在處理的數據)
size_t ongoing_size() const {
while (true) {
// 版本號一致性檢查,確保讀取期間沒有擴容
size_t version_before = get_expansion_version();
size_t current_tail = tail_end.load(std::memory_order_acquire);
size_t current_head = head_end.load(std::memory_order_acquire);
size_t version_after = get_expansion_version();
if (version_before != version_after) {
std::this_thread::yield();
continue;
}
// 狀態一致性檢查
if (current_tail < current_head) {
std::this_thread::yield();
continue;
}
return current_tail - current_head; // 返回活躍數據範圍大小
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment