Skip to content

Instantly share code, notes, and snippets.

@carlchen0928
Last active August 29, 2015 14:23
Show Gist options
  • Save carlchen0928/a12a0a7be29764abcb19 to your computer and use it in GitHub Desktop.
Save carlchen0928/a12a0a7be29764abcb19 to your computer and use it in GitHub Desktop.
thread safe queue with linked list and two lock for head and tail.
#include <mutex>
#include <memory>
#include <stdio.h>
#include <condition_variable>
#define DEBUG1
template <class T>
class threadsafe_subtle_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mtx;
std::mutex tail_mtx;
std::condition_variable cv;
std::unique_ptr<node> head;
node* tail;
node* get_tail()
{
std::lock_guard<std::mutex> lg(tail_mtx);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> ptr = std::move(head);
head = std::move(ptr->next);
return ptr;
}
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> lg(head_mtx);
// release lock before do compare.
// before do compare, maybe push() called, but
// it will not affect result(consistence).
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> lg(head_mtx);
// release lock before do compare.
// before do compare, maybe push() called, but
// it will not affect result(consistence).
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
value = std::move(*head->data);
return pop_head();
}
std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> ul(head_mtx);
cv.wait(ul, [&]{
return head.get() != get_tail();
});
return std::move(ul);
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> ul = wait_for_data();
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> ul = wait_for_data();
value = std::move(*head->data);
return pop_head();
}
public:
threadsafe_subtle_queue() :
head(new node), tail(head.get())
{}
threadsafe_subtle_queue(const threadsafe_subtle_queue&);
threadsafe_subtle_queue& operator=(const threadsafe_subtle_queue&);
void push(T value)
{
#ifdef DEBUG
// not thread safe
// std::cout << "pushing value: " << value << std::endl;
printf("pushing value: %d\n", value);
#endif
std::shared_ptr<T> data(
std::make_shared<T>(std::move(value)));
std::unique_ptr<node> p(new node);
node* const new_node = p.get();
{
std::lock_guard<std::mutex> lg(tail_mtx);
tail->next = std::move(p);
tail->data = data;
tail = new_node;
}
// notify after unlock.
cv.notify_one();
}
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> ptr = try_pop_head();
return ptr ? ptr->data : std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node> const ptr = try_pop_head(value);
return ptr.get() != nullptr;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> ptr = wait_pop_head();
#ifdef DEBUG
printf("poping value %d\n", *ptr->data);
#endif
return ptr->data;
}
void wait_and_pop(T& value)
{
wait_and_pop(value);
}
bool empty()
{
std::lock_guard<std::mutex> lg(head_mtx);
return head.get() == get_tail();
}
};
#include "threadsafe_subtle_queue.cpp"
#include "threadsafe_exceptionsafe_queue.cpp"
#include <thread>
#include <iostream>
using namespace std;
#define P_NUM 16
#define C_NUM 16
#define P_ITEM_NUM 100000
thread p_thread[P_NUM];
thread c_thread[C_NUM];
// threadsafe_subtle_queue<int> q;
threadsafe_exceptionsafe_queue<int> q;
void p_fun()
{
for (int i = 0; i < P_ITEM_NUM; i++)
{
q.push(i);
}
}
void c_fun()
{
int t = P_ITEM_NUM * P_NUM / C_NUM;
for (int i = 0; i < t; i++)
{
q.wait_and_pop();
}
}
int main()
{
for (int i = 0; i < P_NUM; i++)
{
p_thread[i] = thread(p_fun);
}
for (int i = 0; i < C_NUM; i++)
{
c_thread[i] = thread(c_fun);
}
for (int i = 0; i < P_NUM; i++)
{
p_thread[i].join();
}
for (int i = 0; i < C_NUM; i++)
{
c_thread[i].join();
}
}
@carlchen0928
Copy link
Author

Benchmark && Comparision

Thread safe queue with std::queue VS. thread safe queue with self implement linked list. The second queue use two mutex to protect head and tail of this linked list.

Same number of Consumer and Producer

4 Consumer threads, 4 Producer threads, every thread handle 100000 elements.

with two mutex:

real 0m4.263s
user 0m1.129s
sys 0m4.410s

with std::queue:

real 0m5.261s
user 0m0.928s
sys 0m4.732s

4 Consumer threads, 4 Producer threads, every thread handle 1000000 elements.

with two mutex

real 0m48.150s
user 0m12.644s
sys 0m48.854s

with std::queue

real 1m3.244s
user 0m10.914s
sys 0m56.927s

More Producer thread than Consumer thread

2 Consumer threads, 6 Producer threads, every thread handle 100000 elements.

with two mutex

real 0m6.273s
user 0m1.670s
sys 0m6.591s

with std::queue

real 0m7.857s
user 0m1.572s
sys 0m7.239s

2 Consumer threads, 6 Producer threads, every thread handle 1000000 elements.

with two mutex

real 1m7.240s
user 0m17.667s
sys 1m10.450s

with std::queue

real 1m41.113s
user 0m19.558s
sys 1m33.964s

Highly competitive for mutex, much more threads

16 Consumer threads, 16 Producer threads, every thread handle 100000 elements.

with two mutex

real 0m20.233s
user 0m5.107s
sys 0m19.134s

with std::queue

real 0m24.036s
user 0m4.337s
sys 0m22.078s

Conclusion

Seprated mutex for head and tail have better performance than lock entire queue. This is parently when producer threads more than consumer threads because two threads operate in different node witch with different mutex.
Mutex take a lot time to do context switching. We could see that we take much more time in system than in user space. So we could have better performance use spinlock.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment