Created
October 7, 2025 16:52
-
-
Save jacky860226/c8edb8eb24d2afb540186ff0bc02cb91 to your computer and use it in GitHub Desktop.
Concurrent Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #pragma once | |
| #include <algorithm> | |
| #include <atomic> | |
| #include <cassert> | |
| #include <cstddef> | |
| #include <iterator> | |
| #include <memory> | |
| #include <thread> | |
| // 線程安全的無鎖環形隊列,支持動態擴容 | |
| // 核心特性:雙指針系統 + RAII守護類 + 原子操作 + 版本控制 | |
| template <typename T> class concurrent_queue { | |
| private: | |
| // 雙指針系統:分離預約和提交操作,實現無鎖並發 | |
| std::atomic<size_t> head_start, head_end; // 出隊:[start,end)為正在處理區間 | |
| std::atomic<size_t> tail_start, tail_end; // 入隊:[start,end)為正在處理區間 | |
| std::atomic<size_t> num_thread_assigning; // 當前活躍線程數(用於擴容同步) | |
| std::unique_ptr<T[]> buffer; // 環形緩衝區 | |
| std::atomic<size_t> capacity; // 當前容量(2的冪) | |
| std::atomic<bool> being_expanded; // 擴容互斥鎖(CAS實現) | |
| std::atomic<size_t> expansion_version; // 擴容版本號(檢測並發擴容) | |
| inline static constexpr size_t INITIAL_CAPACITY = 512; | |
| // RAII擴容鎖守護:確保同一時間只有一個線程執行擴容操作 | |
| class expansion_guard { | |
| std::atomic<bool> &flag; | |
| bool acquired; | |
| public: | |
| explicit expansion_guard(std::atomic<bool> &f) : flag(f), acquired(false) { | |
| // 使用CAS原子操作嘗試獲取擴容鎖(false->true) | |
| bool expected = false; | |
| acquired = flag.compare_exchange_strong(expected, true, | |
| std::memory_order_acq_rel); | |
| // 只有一個線程能成功,其他線程的acquired將為false | |
| } | |
| ~expansion_guard() { | |
| // 自動釋放擴容鎖(如果成功獲取了的話) | |
| if (acquired) { | |
| flag.store(false, std::memory_order_release); | |
| } | |
| } | |
| bool is_acquired() const { return acquired; } | |
| expansion_guard(const expansion_guard &) = delete; | |
| expansion_guard &operator=(const expansion_guard &) = delete; | |
| expansion_guard(expansion_guard &&) = delete; | |
| expansion_guard &operator=(expansion_guard &&) = delete; | |
| }; | |
| // RAII入隊預約守護:管理tail指針的兩階段提交 | |
| class push_reservation_guard { | |
| concurrent_queue *queue_ptr; | |
| size_t reserved_start; | |
| size_t reserved_size; | |
| bool committed; | |
| public: | |
| push_reservation_guard(concurrent_queue *queue, size_t size) | |
| : queue_ptr(queue), reserved_size(size), committed(false) { | |
| // 預約階段:原子性預約tail空間 | |
| while (true) { | |
| // 步驟1:版本號一致性檢查,防止讀取過程中發生擴容 | |
| size_t version_before = queue->get_expansion_version(); | |
| size_t current_tail = | |
| queue_ptr->tail_end.load(std::memory_order_acquire); | |
| size_t current_capacity = | |
| queue->capacity.load(std::memory_order_acquire); | |
| size_t current_head = queue->head_end.load(std::memory_order_acquire); | |
| size_t version_after = queue->get_expansion_version(); | |
| if (version_before != version_after) { | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| // 步驟2:狀態一致性檢查 | |
| if (current_tail < current_head) { | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| size_t current_size = current_tail - current_head; | |
| // 步驟3:容量檢查,如需要則觸發擴容 | |
| if (current_size + size > current_capacity) { | |
| queue->try_expand(current_size + size); | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| // 步驟4:註冊為活躍線程,防止擴容開始 | |
| queue_ptr->num_thread_assigning.fetch_add(1, std::memory_order_acq_rel); | |
| if (queue_ptr->being_expanded.load(std::memory_order_acquire)) { | |
| queue_ptr->num_thread_assigning.fetch_sub(1, | |
| std::memory_order_acq_rel); | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| // 步驟5:CAS預約tail空間 | |
| size_t expected_tail = current_tail; | |
| size_t new_tail = current_tail + size; | |
| if (queue_ptr->tail_end.compare_exchange_weak( | |
| expected_tail, new_tail, std::memory_order_acq_rel)) { | |
| // 預約成功後再次檢查擴容(避免TOCTOU問題) | |
| if (queue_ptr->being_expanded.load(std::memory_order_acquire)) { | |
| // 擴容開始了,回滾tail_end並等待擴容完成 | |
| size_t rollback_expected = new_tail; | |
| queue_ptr->tail_end.compare_exchange_strong( | |
| rollback_expected, expected_tail, std::memory_order_acq_rel); | |
| queue_ptr->num_thread_assigning.fetch_sub( | |
| 1, std::memory_order_acq_rel); | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| reserved_start = expected_tail; | |
| break; | |
| } | |
| queue_ptr->num_thread_assigning.fetch_sub(1, std::memory_order_acq_rel); | |
| std::this_thread::yield(); | |
| } | |
| } | |
| ~push_reservation_guard() { | |
| // 析構時自動提交,確保兩階段提交完整性 | |
| commit(); | |
| queue_ptr->num_thread_assigning.fetch_sub(1, std::memory_order_acq_rel); | |
| } | |
| size_t get_start() const { return reserved_start; } | |
| size_t get_size() const { return reserved_size; } | |
| void commit() { | |
| if (committed) | |
| return; | |
| // 提交階段:等待輪到自己提交(按預約順序) | |
| while (true) { | |
| size_t expected_start = reserved_start; | |
| if (queue_ptr->tail_start.compare_exchange_weak( | |
| expected_start, reserved_start + reserved_size, | |
| std::memory_order_acq_rel)) { | |
| committed = true; | |
| break; | |
| } | |
| std::this_thread::yield(); | |
| } | |
| } | |
| push_reservation_guard(const push_reservation_guard &) = delete; | |
| push_reservation_guard &operator=(const push_reservation_guard &) = delete; | |
| push_reservation_guard(push_reservation_guard &&) = delete; | |
| push_reservation_guard &operator=(push_reservation_guard &&) = delete; | |
| }; | |
| // RAII出隊預約守護:管理head指針的兩階段提交 | |
| class pop_reservation_guard { | |
| concurrent_queue *queue_ptr; | |
| size_t reserved_start; | |
| size_t reserved_size; | |
| bool committed; | |
| public: | |
| pop_reservation_guard(concurrent_queue *queue, size_t requested_size) | |
| : queue_ptr(queue), committed(false) { | |
| // 註冊為活躍線程(與push_reservation_guard相同邏輯) | |
| while (true) { | |
| if (queue_ptr->being_expanded.load(std::memory_order_acquire)) { | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| queue_ptr->num_thread_assigning.fetch_add(1, std::memory_order_acq_rel); | |
| if (queue_ptr->being_expanded.load(std::memory_order_acquire)) { | |
| queue_ptr->num_thread_assigning.fetch_sub(1, | |
| std::memory_order_acq_rel); | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| break; | |
| } | |
| // 預約head空間用於出隊 | |
| size_t current_head = queue_ptr->head_end.load(std::memory_order_acquire); | |
| while (true) { | |
| size_t current_tail = | |
| queue_ptr->tail_end.load(std::memory_order_acquire); | |
| size_t expected_head = current_head; | |
| if (current_tail < current_head) { | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| size_t current_size = current_tail - current_head; | |
| // 實際預約大小 = min(請求大小, 可用數據量) | |
| size_t actual_size = std::min(requested_size, current_size); | |
| size_t new_head = current_head + actual_size; | |
| // CAS預約head空間 | |
| if (queue_ptr->head_end.compare_exchange_weak( | |
| expected_head, new_head, std::memory_order_acq_rel)) { | |
| reserved_start = expected_head; | |
| reserved_size = actual_size; | |
| break; | |
| } | |
| current_head = expected_head; | |
| } | |
| } | |
| ~pop_reservation_guard() { | |
| commit(); | |
| queue_ptr->num_thread_assigning.fetch_sub(1, std::memory_order_acq_rel); | |
| } | |
| size_t get_start() const { return reserved_start; } | |
| size_t get_size() const { return reserved_size; } | |
| void commit() { | |
| if (committed) | |
| return; | |
| if (reserved_size == 0) { | |
| committed = true; | |
| return; | |
| } | |
| // 提交階段:等待輪到自己提交(按預約順序) | |
| while (true) { | |
| size_t expected_start = reserved_start; | |
| if (queue_ptr->head_start.compare_exchange_weak( | |
| expected_start, reserved_start + reserved_size, | |
| std::memory_order_acq_rel)) { | |
| committed = true; | |
| break; | |
| } | |
| std::this_thread::yield(); | |
| } | |
| } | |
| pop_reservation_guard(const pop_reservation_guard &) = delete; | |
| pop_reservation_guard &operator=(const pop_reservation_guard &) = delete; | |
| pop_reservation_guard(pop_reservation_guard &&) = delete; | |
| pop_reservation_guard &operator=(pop_reservation_guard &&) = delete; | |
| }; | |
| // 初始化所有原子變量 | |
| void initialize_atomics(size_t tail_value = 0) { | |
| head_start.store(0, std::memory_order_relaxed); | |
| head_end.store(0, std::memory_order_relaxed); | |
| tail_start.store(tail_value, std::memory_order_relaxed); | |
| tail_end.store(tail_value, std::memory_order_relaxed); | |
| num_thread_assigning.store(0, std::memory_order_relaxed); | |
| being_expanded.store(false, std::memory_order_relaxed); | |
| expansion_version.store(0, std::memory_order_relaxed); | |
| } | |
| // 設置容量為2的冪 | |
| void setup_capacity(size_t required_capacity) { | |
| size_t cap = 1; | |
| while (cap < required_capacity) { | |
| cap <<= 1; | |
| } | |
| capacity.store(cap, std::memory_order_relaxed); | |
| buffer = std::make_unique<T[]>(cap); | |
| } | |
| // 等待擴容完成並獲取當前版本號 | |
| size_t get_expansion_version() const { | |
| while (being_expanded.load(std::memory_order_acquire)) { | |
| std::this_thread::yield(); | |
| } | |
| return expansion_version.load(std::memory_order_acquire); | |
| } | |
| // 擴容函數:同一時間只有一個線程能執行 | |
| bool try_expand(size_t required_capacity) { | |
| // 步驟1:嘗試獲取擴容鎖(CAS操作),確保只有一個線程執行擴容 | |
| expansion_guard expand_guard(being_expanded); | |
| if (!expand_guard.is_acquired()) { | |
| return false; // 其他線程正在擴容 | |
| } | |
| // 步驟2:雙重檢查容量需求 | |
| const size_t old_capacity = capacity.load(std::memory_order_acquire); | |
| if (required_capacity <= old_capacity) { | |
| return false; // 可能其他線程已完成擴容 | |
| } | |
| // 步驟3:等待所有活躍線程完成操作,確保狀態一致 | |
| size_t head_start_val = 0; | |
| size_t head_end_val = 0; | |
| size_t tail_start_val = 0; | |
| size_t tail_end_val = 0; | |
| while (true) { | |
| // 等待所有線程完成當前操作 | |
| while (num_thread_assigning.load(std::memory_order_acquire) > 0) { | |
| std::this_thread::yield(); | |
| } | |
| // 讀取所有指針狀態 | |
| head_start_val = head_start.load(std::memory_order_acquire); | |
| head_end_val = head_end.load(std::memory_order_acquire); | |
| tail_start_val = tail_start.load(std::memory_order_acquire); | |
| tail_end_val = tail_end.load(std::memory_order_acquire); | |
| // 檢查狀態一致性:所有操作都應該已完成 | |
| if (head_start_val != head_end_val) { | |
| continue; // 還有出隊操作未完成 | |
| } | |
| if (tail_start_val != tail_end_val) { | |
| continue; // 還有入隊操作未完成 | |
| } | |
| if (tail_start_val < head_start_val) { | |
| continue; // 狀態不一致 | |
| } | |
| break; // 狀態一致,可以安全擴容 | |
| } | |
| // 步驟4:計算擴容參數 | |
| const size_t current_head = head_start_val; | |
| const size_t current_tail = tail_start_val; | |
| const size_t current_size = current_tail - current_head; | |
| size_t new_capacity = old_capacity; | |
| while (new_capacity < required_capacity) { | |
| new_capacity <<= 1; // 擴容到2的冪 | |
| } | |
| const size_t mask = old_capacity - 1; | |
| // 步驟5:分配新緩衝區並複製數據 | |
| T *old_buffer = buffer.get(); | |
| auto new_buffer = std::make_unique<T[]>(new_capacity); | |
| T *new_buf = new_buffer.get(); | |
| const size_t start_idx = current_head & mask; | |
| // 處理環形緩衝區的數據複製 | |
| if (start_idx + current_size <= old_capacity) { | |
| // 數據連續,直接複製 | |
| for (size_t i = 0; i < current_size; ++i) { | |
| new_buf[i] = std::move(old_buffer[start_idx + i]); | |
| } | |
| } else { | |
| // 數據跨越邊界,分兩段複製 | |
| const size_t first_part = old_capacity - start_idx; | |
| const size_t second_part = current_size - first_part; | |
| for (size_t i = 0; i < first_part; ++i) { | |
| new_buf[i] = std::move(old_buffer[start_idx + i]); | |
| } | |
| for (size_t i = 0; i < second_part; ++i) { | |
| new_buf[first_part + i] = std::move(old_buffer[i]); | |
| } | |
| } | |
| // 步驟6:原子性更新所有狀態 | |
| buffer = std::move(new_buffer); | |
| capacity.store(new_capacity, std::memory_order_release); | |
| head_start.store(0, std::memory_order_release); | |
| head_end.store(0, std::memory_order_release); | |
| tail_start.store(current_size, std::memory_order_release); | |
| tail_end.store(current_size, std::memory_order_release); | |
| // 步驟7:增加版本號,通知其他線程擴容完成 | |
| expansion_version.fetch_add(1, std::memory_order_release); | |
| return true; // expand_guard析構時自動釋放擴容鎖 | |
| } | |
| public: | |
| // 構造函數:創建指定初始容量的空隊列 | |
| explicit concurrent_queue(size_t initial_capacity = INITIAL_CAPACITY) { | |
| setup_capacity(initial_capacity); | |
| initialize_atomics(); | |
| } | |
| // 構造函數:從迭代器範圍創建隊列 | |
| template <typename Iterator> concurrent_queue(Iterator begin, Iterator end) { | |
| size_t item_count = std::distance(begin, end); | |
| setup_capacity(item_count); | |
| std::copy(begin, end, buffer.get()); | |
| initialize_atomics(item_count); | |
| } | |
| ~concurrent_queue() = default; | |
| // 禁止拷貝和移動,確保唯一性 | |
| concurrent_queue(const concurrent_queue &) = delete; | |
| concurrent_queue &operator=(const concurrent_queue &) = delete; | |
| concurrent_queue(concurrent_queue &&) = delete; | |
| concurrent_queue &operator=(concurrent_queue &&) = delete; | |
| // 批量入隊操作:線程安全的尾部插入 | |
| template <typename Iterator> void push_back(Iterator start, Iterator end) { | |
| if (start == end) | |
| return; | |
| const size_t items_size = std::distance(start, end); | |
| // 使用RAII守護類進行兩階段提交 | |
| push_reservation_guard reservation(this, items_size); | |
| size_t reserved_start = reservation.get_start(); | |
| // 等待 pop_front 完成資料轉移,確保不會覆蓋未讀數據 | |
| size_t current_capacity = capacity.load(std::memory_order_acquire); | |
| while (reserved_start + items_size - | |
| head_start.load(std::memory_order_acquire) > | |
| current_capacity) { | |
| std::this_thread::yield(); | |
| } | |
| // 將數據寫入預約的環形緩衝區空間 | |
| T *buf = buffer.get(); | |
| const size_t mask = current_capacity - 1; | |
| size_t i = 0; | |
| for (auto it = start; it != end; ++it, ++i) { | |
| size_t idx = (reserved_start + i) & mask; // 環形索引 | |
| buf[idx] = *it; | |
| } | |
| // reservation析構時自動提交 | |
| } | |
| // 容器入隊的便利方法 | |
| template <typename Container> void push_back(const Container &items) { | |
| push_back(std::begin(items), std::end(items)); | |
| } | |
| // 批量出隊操作:線程安全的頭部移除 | |
| template <typename OutputIterator> | |
| size_t pop_front(size_t num, OutputIterator out) { | |
| if (num == 0) | |
| return 0; | |
| // 使用RAII守護類進行兩階段提交 | |
| pop_reservation_guard reservation(this, num); | |
| size_t reserved_start = reservation.get_start(); | |
| size_t actual_num = reservation.get_size(); | |
| // 等待數據寫入完成(tail_start推進到預約範圍) | |
| while (tail_start.load(std::memory_order_acquire) < | |
| reserved_start + actual_num) { | |
| std::this_thread::yield(); | |
| } | |
| // 從環形緩衝區讀取數據 | |
| size_t current_capacity = capacity.load(std::memory_order_acquire); | |
| T *buf = buffer.get(); | |
| const size_t mask = current_capacity - 1; | |
| for (size_t i = 0; i < actual_num; ++i) { | |
| size_t idx = (reserved_start + i) & mask; // 環形索引 | |
| *out++ = std::move(buf[idx]); | |
| } | |
| return actual_num; // 返回實際出隊的元素數量 | |
| } | |
| // 獲取隊列當前活躍範圍大小(包含正在處理的數據) | |
| size_t ongoing_size() const { | |
| while (true) { | |
| // 版本號一致性檢查,確保讀取期間沒有擴容 | |
| size_t version_before = get_expansion_version(); | |
| size_t current_tail = tail_end.load(std::memory_order_acquire); | |
| size_t current_head = head_end.load(std::memory_order_acquire); | |
| size_t version_after = get_expansion_version(); | |
| if (version_before != version_after) { | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| // 狀態一致性檢查 | |
| if (current_tail < current_head) { | |
| std::this_thread::yield(); | |
| continue; | |
| } | |
| return current_tail - current_head; // 返回活躍數據範圍大小 | |
| } | |
| } | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment