Created
March 13, 2018 12:39
-
-
Save TurpentineDistillery/cba204646e631a3eeda5b06cac595fde to your computer and use it in GitHub Desktop.
Concurrent-queue benchmarks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <queue> | |
#include <algorithm> | |
#include <functional> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <atomic> | |
#include <future> | |
#include <chrono> | |
#include <cassert> | |
#include <iostream> | |
#include <iomanip> | |
#include <sys/utsname.h> | |
// outside of diagnostic push because occur in blockingconcurrentqueue's instantiations | |
#pragma GCC diagnostic ignored "-Wshadow" | |
#pragma GCC diagnostic ignored "-Wextra" | |
#pragma GCC diagnostic push | |
#pragma GCC diagnostic ignored "-Wsign-conversion" | |
#pragma GCC diagnostic ignored "-Wconversion" | |
#ifdef WITH_BOOST_FIBER | |
#include <boost/fiber/bounded_channel.hpp> | |
#endif | |
#include <boost/thread/sync_bounded_queue.hpp> | |
#include <boost/lockfree/queue.hpp> | |
#include <tbb/concurrent_queue.h> | |
#include "concurrentqueue/blockingconcurrentqueue.h" // moodycamel | |
#include <queues/include/mpmc-bounded-queue.hpp> // https://github.com/mstump/queues | |
// https://int08h.com/post/ode-to-a-vyukov-queue/ | |
// required by MPMCQueue.h; missing in earlier stdlib | |
namespace std | |
{ | |
inline void *align( std::size_t alignment, std::size_t size, | |
void *&ptr, std::size_t &space ) { | |
std::uintptr_t pn = reinterpret_cast< std::uintptr_t >( ptr ); | |
std::uintptr_t aligned = ( pn + alignment - 1 ) & - alignment; | |
std::size_t padding = aligned - pn; | |
if ( space < size + padding ) return nullptr; | |
space -= padding; | |
return ptr = reinterpret_cast< void * >( aligned ); | |
} | |
} | |
#include "MPMCQueue.h" //https://github.com/rigtorp/MPMCQueue.git | |
#pragma GCC diagnostic pop | |
///////////////////////////////////////////////////////////////////////////// | |
///////////////////////////////////////////////////////////////////////////// | |
///////////////////////////////////////////////////////////////////////////// | |
// synchronized-queue capacity | |
// NB: mpmc-bounded-queue requires power of two capacity | |
static const size_t g_capacity = 4096; | |
// will pump this many elements through the queue | |
static const size_t g_num_elements = g_capacity * std::thread::hardware_concurrency(); | |
namespace mt | |
{ | |
///////////////////////////////////////////////////////////////////////////// | |
struct timer | |
{ | |
/// returns the time elapsed since timer's instantiation, in seconds. | |
/// To reset: `my_timer = mt::timer{}`. | |
operator double() const | |
{ | |
return (double)std::chrono::duration_cast<std::chrono::nanoseconds>( | |
clock_t::now() - start_timepoint).count() * 1e-9; | |
} | |
private: | |
using clock_t = std::chrono::steady_clock; | |
clock_t::time_point start_timepoint = clock_t::now(); | |
}; | |
///////////////////////////////////////////////////////////////////////////// | |
/// \brief Can be used as alternative to std::mutex. | |
/// It is typically faster than std::mutex, yet does not aggressively max-out the CPUs. | |
/// NB: may cause thread starvation in some scenarios. | |
template<uint64_t SleepMicrosec = 50> | |
class atomic_lock | |
{ | |
std::atomic_flag m_flag = ATOMIC_FLAG_INIT; | |
public: | |
atomic_lock() = default; | |
// non-copyable and non-movable, as with std::mutex | |
atomic_lock(const atomic_lock&) = delete; | |
atomic_lock& operator=(const atomic_lock&) = delete; | |
atomic_lock(atomic_lock&&) = delete; | |
atomic_lock& operator=(atomic_lock&&) = delete; | |
///////////////////////////////////////////////////////////////////////// | |
bool try_lock() noexcept | |
{ | |
return !m_flag.test_and_set(std::memory_order_acquire); | |
} | |
void lock() noexcept | |
{ | |
while(m_flag.test_and_set(std::memory_order_acquire)) { | |
std::this_thread::sleep_for( | |
std::chrono::microseconds(SleepMicrosec)); | |
} | |
} | |
void unlock() noexcept | |
{ | |
m_flag.clear(std::memory_order_release); | |
} | |
}; | |
///////////////////////////////////////////////////////////////////////////// | |
/// A minimalistic blocking, optionally-bounded synchronized-queue. | |
/// For testing/comparison/benchmarking. not for production. | |
/// Only requires MoveAssigneable from value_type | |
template <typename T, class BasicLockable = std::mutex /*or mt::atomic_lock*/> | |
class naive_synchronized_queue | |
{ | |
public: | |
using value_type = T; | |
naive_synchronized_queue(size_t capacity_ = size_t(-1)) | |
: m_capacity{ capacity_ == 0 ? 1 : capacity_ } | |
{} | |
///////////////////////////////////////////////////////////////////////// | |
void push(value_type val) | |
{ | |
// NB: guards are to prevent thread starvation in | |
// MPSC and SPMC scenarios when used with atomic_lock | |
guard_t guard{ m_push_mutex }; | |
lock_t lock{ m_mutex }; | |
m_can_push.wait( | |
lock, [this]{ return m_queue.size() < m_capacity; }); | |
m_queue.push(std::move(val)); | |
lock.unlock(); | |
m_can_pop.notify_one(); | |
} | |
///////////////////////////////////////////////////////////////////////// | |
value_type pop() | |
{ | |
guard_t guard{ m_pop_mutex }; | |
lock_t lock{ m_mutex }; | |
m_can_pop.wait( | |
lock, [this]{ return !m_queue.empty() ;}); | |
value_type ret = std::move(m_queue.front()); | |
m_queue.pop(); | |
lock.unlock(); | |
m_can_push.notify_one(); | |
return ret; | |
} | |
///////////////////////////////////////////////////////////////////////// | |
private: | |
using lock_t = std::unique_lock<BasicLockable>; | |
using guard_t = std::lock_guard<BasicLockable>; | |
using queue_t = std::queue<value_type>; | |
using condvar_t = typename std::conditional< | |
std::is_same<BasicLockable, std::mutex>::value, | |
std::condition_variable, | |
std::condition_variable_any >::type; | |
size_t m_capacity; | |
queue_t m_queue; | |
BasicLockable m_mutex; | |
BasicLockable m_push_mutex; | |
BasicLockable m_pop_mutex; | |
condvar_t m_can_push; | |
condvar_t m_can_pop; | |
}; // naive_synchronized_queue | |
static void min_sleep() | |
{ | |
std::this_thread::sleep_for( | |
std::chrono::nanoseconds(1)); | |
// NB: sleep a minimal amount to avoid hogging the CPU. | |
// (the actual time is greater than 1ns; depends on the CPU scheduler) | |
} | |
} // namespace mt | |
///////////////////////////////////////////////////////////////////////////// | |
///////////////////////////////////////////////////////////////////////////// | |
// Normalizing interface to the various synchronized-queue implementations: | |
// | |
// | |
// template<typename T, typename Queue> | |
// void push(T value); // blocking | |
// | |
// template<typename T, typename Queue> | |
// T pop(); // blocking | |
// | |
// Non-blocking calls are wrapped into a busy-wait loops | |
// (e.g. boost::lockfree below) | |
template<typename T, typename Queue> | |
auto push(Queue& q, T value) -> decltype((void)q.push(T{})) | |
{ | |
q.push(std::move(value)); | |
} | |
template<typename T> | |
void push(boost::lockfree::queue<T>& q, T value) | |
{ | |
// bounded_push blocks if the queue is at capacity | |
while(!q.bounded_push(std::move(value))) { mt::min_sleep(); } | |
} | |
template<typename T> | |
void push(moodycamel::BlockingConcurrentQueue<T>& q, T value) | |
{ | |
#if 1 | |
while(q.size_approx() > g_capacity) { | |
mt::min_sleep(); | |
} | |
// NB: insertion may violate the capacity bound, | |
// but there's no implicit bounding mechanism | |
// in BlockingConcurrentQueue as far as I can tell. | |
// We're keeping it "manually" approximately within capacity. | |
#else | |
// Even if we don't cap the capacity, this does not | |
// affect the results of the benchmarks. | |
#endif | |
q.enqueue(std::move(value)); | |
} | |
template<typename T> | |
void push(moodycamel::ConcurrentQueue<T>& q, T value) | |
{ | |
while(q.size_approx() > g_capacity) { | |
mt::min_sleep(); // see comments in the overload above | |
} | |
q.enqueue(std::move(value)); | |
} | |
template<typename T> | |
void push(tbb::concurrent_queue<T>& q, T value) | |
{ | |
while(q.unsafe_size() > g_capacity) { | |
mt::min_sleep(); // see comments in the overload above | |
} | |
q.push(std::move(value)); | |
} | |
template<typename T> | |
void push(mpmc_bounded_queue_t<T>& q, T value) | |
{ | |
while(!q.enqueue(value)) { | |
mt::min_sleep(); | |
} | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
template<typename T, typename Queue> | |
auto pop(Queue& q) -> decltype(T{ q.pop() }) | |
{ | |
return q.pop(); | |
} | |
template<typename T, typename Queue> | |
auto pop(Queue& q) -> decltype((void)q.pop(std::declval<T&>()), T{}) | |
{ | |
T item{}; | |
q.pop(item); | |
return item; | |
} | |
template<typename T> | |
T pop(boost::sync_bounded_queue<T>& q) | |
{ | |
return q.pull_front(); | |
} | |
#ifdef WITH_BOOST_FIBER | |
template<typename T> | |
T pop(boost::fibers::bounded_channel<T>& q) | |
{ | |
return q.value_pop(); | |
} | |
#endif | |
template<typename T> | |
T pop(boost::lockfree::queue<T>& q) | |
{ | |
T elem{}; | |
while(!q.pop(elem)) { | |
mt::min_sleep(); | |
} | |
return elem; | |
} | |
template<typename T> | |
T pop(moodycamel::ConcurrentQueue<T>& q) | |
{ | |
T item{}; | |
while(!q.try_dequeue(item)) { | |
mt::min_sleep(); | |
} | |
return item; | |
} | |
template<typename T> | |
T pop(moodycamel::BlockingConcurrentQueue<T>& q) | |
{ | |
T item{}; | |
q.wait_dequeue(item); | |
return item; | |
} | |
template<typename T> | |
T pop(tbb::concurrent_queue<T>& q) | |
{ | |
T item{}; | |
while(!q.try_pop(item)) { | |
mt::min_sleep(); | |
} | |
return item; | |
} | |
template<typename T> | |
T pop(mpmc_bounded_queue_t<T>& q) | |
{ | |
T item{}; | |
while(!q.dequeue(item)) { | |
mt::min_sleep(); | |
} | |
return item; | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
///////////////////////////////////////////////////////////////////////////// | |
/// Pump `num_elems` through the queue using specified number of pushing and popping threads | |
/// @return throughput (items pumped through the queue per second) | |
template<typename T, class Queue> | |
double get_throughput( | |
Queue& queue, | |
size_t num_pushers, | |
size_t num_poppers, | |
size_t num_elems) | |
{ | |
// verify that num_pushers and num_poppers both divide num_elems | |
assert(num_elems/num_pushers*num_pushers == num_elems); | |
assert(num_elems/num_poppers*num_poppers == num_elems); | |
std::vector<std::future<long>> pushers{ num_pushers }; | |
std::vector<std::future<long>> poppers{ num_poppers }; | |
mt::timer timer; | |
// launch pushing tasks | |
for(auto& p : pushers) { | |
p = std::async(std::launch::async, [&] | |
{ | |
long total = 0; | |
for(size_t j = 0; j < num_elems/num_pushers; j++) { | |
push<T>(queue, 1); | |
total++; | |
} | |
return total; | |
}); | |
} | |
// launch popping tasks | |
for(auto& p : poppers) { | |
p = std::async(std::launch::async, [&] | |
{ | |
long total = 0; | |
for(size_t j = 0; j < num_elems/num_poppers; j++) { | |
total += pop<T>(queue); | |
} | |
return total; | |
}); | |
} | |
// NB: provided that the time complete the tasks is much greater | |
// than the time to spawn the tasks, the bootstrap time can be ignored. | |
// wait for the workers thread to finish and aggregate the subtotals | |
long total_pushed = 0; | |
for(auto& fut : pushers) { | |
total_pushed += fut.get(); | |
} | |
long total_popped = 0; | |
for(auto& fut : poppers) { | |
total_popped += fut.get(); | |
} | |
if( total_pushed != total_popped | |
|| total_pushed != (long)num_elems) | |
{ | |
std::cerr << "Problem with queue: pushed: " | |
<< total_pushed | |
<< "; popped:" | |
<< total_popped << "\n"; | |
assert(false); | |
} | |
return double(num_elems)/timer; | |
} | |
template<typename F> | |
double get_harmonic_mean(F&& callable, size_t times) | |
{ | |
double s = 0; | |
for(size_t i = 0; i < times; i++) { | |
s += 1.0/callable(); | |
} | |
return double(times)/s; | |
} | |
// Do multiple runs of a single scenario; compute harmonic mean | |
// of the throughputs and report to cerr. | |
template<typename T, class Queue> | |
void test_scenario( | |
Queue& queue, | |
size_t num_pushers, | |
size_t num_poppers, | |
size_t num_elems) | |
{ | |
auto get_throughput_once = [&queue, num_pushers, num_poppers, num_elems] { | |
return get_throughput<T>(queue, num_pushers, num_poppers, num_elems); | |
}; | |
// aggregate over a few runs | |
const double throughput = get_harmonic_mean(get_throughput_once, 10); | |
const auto s = std::to_string(num_pushers) | |
+ "/" + std::to_string(num_poppers); | |
std::cerr << std::setw(8) << std::left << s | |
<< std::setw(8) << std::left << std::setprecision(4) << throughput/1e6 | |
<< std::setw(0) ; | |
size_t bar_size = size_t(throughput/1e5)+1; | |
size_t bar_capacity = 100; | |
for(size_t i = 0; i < std::min(bar_capacity, bar_size); i++) { | |
std::cerr << "*"; | |
} | |
std::cerr << (bar_size > bar_capacity ? "..." : "") << std::endl; | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
template<typename T, class Queue> | |
void test_scenarios(Queue& queue) | |
{ | |
size_t num_cores = std::thread::hardware_concurrency(); | |
size_t half_cores = num_cores == 1 ? 1 : num_cores/2; | |
size_t num_elems = g_num_elements; | |
test_scenario<T>(queue, 1UL, 1UL, num_elems/4); // SPSC | |
test_scenario<T>(queue, 1UL, num_cores, num_elems/4); // SPMC | |
test_scenario<T>(queue, num_cores, 1UL, num_elems/4); // MPSC | |
test_scenario<T>(queue, half_cores, half_cores, num_elems); // MPMC | |
test_scenario<T>(queue, num_cores*4, num_cores*4, num_elems); // MPSC(congested) | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
// Queue has a constructor accepting capacity. | |
template<typename T, typename Queue> | |
void test(const std::string& label, Queue*) | |
{ | |
std::cerr << "\n" << label << "\n"; | |
Queue queue{ g_capacity }; | |
test_scenarios<T>(queue); | |
} | |
// tbb::concurrent_bounded_queue needs to have set_capacity called explicitly. | |
template<typename T> | |
void test(const std::string& label, tbb::concurrent_bounded_queue<T>*) | |
{ | |
std::cerr << "\n" << label << "\n"; | |
tbb::concurrent_bounded_queue<T> queue{}; | |
queue.set_capacity(g_capacity); | |
test_scenarios<T>(queue); | |
} | |
// tbb::concurrent_queue does not have a contsructor accepting capacity | |
template<typename T> | |
void test(const std::string& label, tbb::concurrent_queue<T>*) | |
{ | |
std::cerr << "\n" << label << "\n"; | |
tbb::concurrent_queue<T> queue{}; | |
test_scenarios<T>(queue); | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
int main() | |
{ | |
struct utsname uts; | |
uname(&uts); | |
std::cerr << "hardware concurrency: " << std::thread::hardware_concurrency() | |
<< "\nplatform:" << uts.sysname << " " << uts.release | |
<< "\nqueue capacity:" << g_capacity | |
<< "\nelements to pump: " << g_num_elements | |
<< "\ncolumns: producers/consumers | throughput(M/s) | bar" | |
<< std::endl; | |
using T = long; | |
#define TEST(...) test<T>(#__VA_ARGS__, reinterpret_cast<__VA_ARGS__*>(NULL)); | |
TEST( mt::naive_synchronized_queue<T, std::mutex> ); | |
TEST( mt::naive_synchronized_queue<T, mt::atomic_lock<> > ); | |
TEST( mpmc_bounded_queue_t<T> ); | |
TEST( tbb::concurrent_queue<T> ); | |
TEST( tbb::concurrent_bounded_queue<T> ); | |
TEST( moodycamel::ConcurrentQueue<T> ); | |
TEST( moodycamel::BlockingConcurrentQueue<T> ); | |
TEST( rigtorp::MPMCQueue<T> ); | |
TEST( boost::sync_bounded_queue<T> ); | |
TEST( boost::lockfree::queue<T> ); | |
#ifdef WITH_BOOST_FIBER | |
TEST( boost::fibers::bounded_channel<T> ); | |
#endif | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ardware concurrency: 32 | |
platform:Linux 3.10.0-693.17.1.el7.x86_64 | |
queue capacity:16384 | |
elements to pump: 524288 | |
columns: producers/consumers | throughput(M/s) | bar | |
mt::naive_synchronized_queue<T, std::mutex> | |
1/1 2.25 *********************** | |
1/32 0.9726 ********** | |
32/1 0.8859 ********* | |
16/16 0.8288 ********* | |
128/128 0.8745 ********* | |
mt::naive_synchronized_queue<T, mt::atomic_lock<> > | |
1/1 4.629 *********************************************** | |
1/32 5.413 ******************************************************* | |
32/1 5.422 ******************************************************* | |
16/16 5.648 ********************************************************* | |
128/128 3.395 ********************************** | |
mpmc_bounded_queue_t<T> | |
1/1 63.21 ****************************************************************************************************... | |
1/32 4.498 ********************************************* | |
32/1 3.997 **************************************** | |
16/16 2.295 *********************** | |
128/128 2.693 *************************** | |
tbb::concurrent_queue<T> | |
1/1 3.617 ************************************* | |
1/32 3.382 ********************************** | |
32/1 6.444 ***************************************************************** | |
16/16 2.864 ***************************** | |
128/128 1.08 *********** | |
tbb::concurrent_bounded_queue<T> | |
1/1 2.953 ****************************** | |
1/32 0.3862 **** | |
32/1 0.4642 ***** | |
16/16 7.293 ************************************************************************* | |
128/128 0.8058 ********* | |
moodycamel::ConcurrentQueue<T> | |
1/1 6.718 ******************************************************************** | |
1/32 6.053 ************************************************************* | |
32/1 2.273 *********************** | |
16/16 5.965 ************************************************************ | |
128/128 2.404 ************************* | |
moodycamel::BlockingConcurrentQueue<T> | |
1/1 4.147 ****************************************** | |
1/32 0.4894 ***** | |
32/1 2.454 ************************* | |
16/16 4.789 ************************************************ | |
128/128 3.42 *********************************** | |
rigtorp::MPMCQueue<T> | |
1/1 27.03 ****************************************************************************************************... | |
1/32 1.985 ******************** | |
32/1 1.52 **************** | |
16/16 16.67 ****************************************************************************************************... | |
128/128 0.9435 ********** | |
boost::sync_bounded_queue<T> | |
1/1 3.826 *************************************** | |
1/32 0.1484 ** | |
32/1 0.1649 ** | |
16/16 1.293 ************* | |
128/128 1.041 *********** | |
boost::lockfree::queue<T> | |
1/1 3.896 *************************************** | |
1/32 2.079 ********************* | |
32/1 1.517 **************** | |
16/16 1.41 *************** | |
128/128 1.365 ************** |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment