Last active
          August 29, 2015 14:23 
        
      - 
      
- 
        Save carlchen0928/a12a0a7be29764abcb19 to your computer and use it in GitHub Desktop. 
    thread safe queue with linked list and two lock for head and tail.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #include <mutex> | |
| #include <memory> | |
| #include <stdio.h> | |
| #include <condition_variable> | |
| #define DEBUG1 | |
| template <class T> | |
| class threadsafe_subtle_queue | |
| { | |
| private: | |
| struct node | |
| { | |
| std::shared_ptr<T> data; | |
| std::unique_ptr<node> next; | |
| }; | |
| std::mutex head_mtx; | |
| std::mutex tail_mtx; | |
| std::condition_variable cv; | |
| std::unique_ptr<node> head; | |
| node* tail; | |
| node* get_tail() | |
| { | |
| std::lock_guard<std::mutex> lg(tail_mtx); | |
| return tail; | |
| } | |
| std::unique_ptr<node> pop_head() | |
| { | |
| std::unique_ptr<node> ptr = std::move(head); | |
| head = std::move(ptr->next); | |
| return ptr; | |
| } | |
| std::unique_ptr<node> try_pop_head() | |
| { | |
| std::lock_guard<std::mutex> lg(head_mtx); | |
| // release lock before do compare. | |
| // before do compare, maybe push() called, but | |
| // it will not affect result(consistence). | |
| if (head.get() == get_tail()) | |
| { | |
| return std::unique_ptr<node>(); | |
| } | |
| return pop_head(); | |
| } | |
| std::unique_ptr<node> try_pop_head(T& value) | |
| { | |
| std::lock_guard<std::mutex> lg(head_mtx); | |
| // release lock before do compare. | |
| // before do compare, maybe push() called, but | |
| // it will not affect result(consistence). | |
| if (head.get() == get_tail()) | |
| { | |
| return std::unique_ptr<node>(); | |
| } | |
| value = std::move(*head->data); | |
| return pop_head(); | |
| } | |
| std::unique_lock<std::mutex> wait_for_data() | |
| { | |
| std::unique_lock<std::mutex> ul(head_mtx); | |
| cv.wait(ul, [&]{ | |
| return head.get() != get_tail(); | |
| }); | |
| return std::move(ul); | |
| } | |
| std::unique_ptr<node> wait_pop_head() | |
| { | |
| std::unique_lock<std::mutex> ul = wait_for_data(); | |
| return pop_head(); | |
| } | |
| std::unique_ptr<node> wait_pop_head(T& value) | |
| { | |
| std::unique_lock<std::mutex> ul = wait_for_data(); | |
| value = std::move(*head->data); | |
| return pop_head(); | |
| } | |
| public: | |
| threadsafe_subtle_queue() : | |
| head(new node), tail(head.get()) | |
| {} | |
| threadsafe_subtle_queue(const threadsafe_subtle_queue&); | |
| threadsafe_subtle_queue& operator=(const threadsafe_subtle_queue&); | |
| void push(T value) | |
| { | |
| #ifdef DEBUG | |
| // not thread safe | |
| // std::cout << "pushing value: " << value << std::endl; | |
| printf("pushing value: %d\n", value); | |
| #endif | |
| std::shared_ptr<T> data( | |
| std::make_shared<T>(std::move(value))); | |
| std::unique_ptr<node> p(new node); | |
| node* const new_node = p.get(); | |
| { | |
| std::lock_guard<std::mutex> lg(tail_mtx); | |
| tail->next = std::move(p); | |
| tail->data = data; | |
| tail = new_node; | |
| } | |
| // notify after unlock. | |
| cv.notify_one(); | |
| } | |
| std::shared_ptr<T> try_pop() | |
| { | |
| std::unique_ptr<node> ptr = try_pop_head(); | |
| return ptr ? ptr->data : std::shared_ptr<T>(); | |
| } | |
| bool try_pop(T& value) | |
| { | |
| std::unique_ptr<node> const ptr = try_pop_head(value); | |
| return ptr.get() != nullptr; | |
| } | |
| std::shared_ptr<T> wait_and_pop() | |
| { | |
| std::unique_ptr<node> ptr = wait_pop_head(); | |
| #ifdef DEBUG | |
| printf("poping value %d\n", *ptr->data); | |
| #endif | |
| return ptr->data; | |
| } | |
| void wait_and_pop(T& value) | |
| { | |
| wait_and_pop(value); | |
| } | |
| bool empty() | |
| { | |
| std::lock_guard<std::mutex> lg(head_mtx); | |
| return head.get() == get_tail(); | |
| } | |
| }; | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #include "threadsafe_subtle_queue.cpp" | |
| #include "threadsafe_exceptionsafe_queue.cpp" | |
| #include <thread> | |
| #include <iostream> | |
| using namespace std; | |
| #define P_NUM 16 | |
| #define C_NUM 16 | |
| #define P_ITEM_NUM 100000 | |
| thread p_thread[P_NUM]; | |
| thread c_thread[C_NUM]; | |
| // threadsafe_subtle_queue<int> q; | |
| threadsafe_exceptionsafe_queue<int> q; | |
| void p_fun() | |
| { | |
| for (int i = 0; i < P_ITEM_NUM; i++) | |
| { | |
| q.push(i); | |
| } | |
| } | |
| void c_fun() | |
| { | |
| int t = P_ITEM_NUM * P_NUM / C_NUM; | |
| for (int i = 0; i < t; i++) | |
| { | |
| q.wait_and_pop(); | |
| } | |
| } | |
| int main() | |
| { | |
| for (int i = 0; i < P_NUM; i++) | |
| { | |
| p_thread[i] = thread(p_fun); | |
| } | |
| for (int i = 0; i < C_NUM; i++) | |
| { | |
| c_thread[i] = thread(c_fun); | |
| } | |
| for (int i = 0; i < P_NUM; i++) | |
| { | |
| p_thread[i].join(); | |
| } | |
| for (int i = 0; i < C_NUM; i++) | |
| { | |
| c_thread[i].join(); | |
| } | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
            
Benchmark && Comparision
Thread safe queue with std::queue VS. thread safe queue with self implement linked list. The second queue use two mutex to protect head and tail of this linked list.
Same number of Consumer and Producer
4 Consumer threads, 4 Producer threads, every thread handle 100000 elements.
with two mutex:
with std::queue:
4 Consumer threads, 4 Producer threads, every thread handle 1000000 elements.
with two mutex
with std::queue
More Producer thread than Consumer thread
2 Consumer threads, 6 Producer threads, every thread handle 100000 elements.
with two mutex
with std::queue
2 Consumer threads, 6 Producer threads, every thread handle 1000000 elements.
with two mutex
with std::queue
Highly competitive for mutex, much more threads
16 Consumer threads, 16 Producer threads, every thread handle 100000 elements.
with two mutex
with std::queue
Conclusion
Seprated mutex for head and tail have better performance than lock entire queue. This is parently when producer threads more than consumer threads because two threads operate in different node witch with different mutex.
Mutex take a lot time to do context switching. We could see that we take much more time in system than in user space. So we could have better performance use spinlock.