Created
December 12, 2015 08:54
-
-
Save amedama41/50ec73706f6039e591b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstddef> | |
#include <algorithm> | |
#include <atomic> | |
#include <iostream> | |
#include <mutex> | |
#include <random> | |
#include <thread> | |
#include <vector> | |
#include <boost/asio/io_service.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <boost/program_options.hpp> | |
#include <boost/range/irange.hpp> | |
#include <boost/range/algorithm/for_each.hpp> | |
#include <boost/thread/barrier.hpp> | |
#include <boost/timer/timer.hpp> | |
namespace asio = boost::asio; | |
std::size_t nwork{}; | |
std::size_t npost{}; | |
std::size_t nloop_max{}; | |
std::atomic<std::size_t> atomic_counter{0}; | |
std::size_t counter{}; | |
thread_local auto thread_index = std::size_t{0}; | |
std::vector<std::size_t> num_invoked{}; | |
thread_local std::mt19937 rnd{}; | |
struct io_service_post_work | |
{ | |
void operator()() | |
{ | |
++num_invoked[thread_index]; | |
auto const prev_count | |
= atomic_counter.fetch_add(1, std::memory_order_relaxed); | |
if (prev_count + 1 > nwork) { | |
io_service.stop(); | |
return; | |
} | |
auto const volatile busy_loop_count | |
= std::uniform_int_distribution<std::size_t>( | |
nloop_max * 0.9, nloop_max)(rnd); | |
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {} | |
boost::for_each( | |
boost::irange(std::size_t{0}, npost) | |
, [&](std::size_t){ io_service.post(*this); }); | |
} | |
asio::io_service& io_service; | |
}; | |
template <class StrandWork> | |
struct strand_wrapper | |
{ | |
void operator()() | |
{ | |
strand.dispatch(StrandWork{strand}); | |
} | |
asio::io_service::strand strand; | |
}; | |
struct strand_wrap_work | |
{ | |
void operator()() | |
{ | |
++num_invoked[thread_index]; | |
if (++counter > nwork) { | |
strand.get_io_service().stop(); | |
return; | |
} | |
auto const volatile busy_loop_count | |
= std::uniform_int_distribution<std::size_t>( | |
nloop_max * 0.9, nloop_max)(rnd); | |
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {} | |
boost::for_each( | |
boost::irange(std::size_t{0}, npost) | |
, [&](std::size_t){ strand.get_io_service().post(strand.wrap(*this)); }); | |
} | |
asio::io_service::strand strand; | |
}; | |
struct strand_post_work | |
{ | |
void operator()() | |
{ | |
++num_invoked[thread_index]; | |
if (++counter > nwork) { | |
strand.get_io_service().stop(); | |
return; | |
} | |
auto const volatile busy_loop_count | |
= std::uniform_int_distribution<std::size_t>( | |
nloop_max * 0.9, nloop_max)(rnd); | |
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {} | |
boost::for_each( | |
boost::irange(std::size_t{0}, npost) | |
, [&](std::size_t){ strand.post(*this); }); | |
} | |
asio::io_service::strand strand; | |
}; | |
struct mutex_io_servcie_post_work | |
{ | |
void operator()() | |
{ | |
++num_invoked[thread_index]; | |
{ | |
std::lock_guard<std::mutex> lock{mutex}; | |
if (++counter > nwork) { | |
io_service.stop(); | |
return; | |
} | |
} | |
auto const volatile busy_loop_count | |
= std::uniform_int_distribution<std::size_t>( | |
nloop_max * 0.9, nloop_max)(rnd); | |
for (auto i = std::size_t{}; i < busy_loop_count; ++i) {} | |
boost::for_each( | |
boost::irange(std::size_t{0}, npost) | |
, [&](std::size_t){ io_service.post(*this); }); | |
} | |
asio::io_service& io_service; | |
static std::mutex mutex; | |
}; | |
std::mutex mutex_io_servcie_post_work::mutex{}; | |
template <class Work, class Executor, class Counter> | |
void run(std::vector<asio::io_service>& io_service | |
, std::vector<Executor>& executor | |
, std::size_t const nthread, Counter& counter) | |
{ | |
auto threads = std::vector<std::thread>(); | |
threads.reserve(nthread); | |
counter = 0; | |
::num_invoked.resize(nthread); | |
std::cout << "setup " << nthread << " thread(s)..." << std::endl; | |
boost::barrier barrier(nthread + 1); | |
for (auto i = std::size_t{0}; i < nthread; ++i) { | |
threads.emplace_back([&, i] { | |
::thread_index = i; | |
::num_invoked[::thread_index] = 0; | |
auto& ios = io_service[i % io_service.size()]; | |
ios.post(Work{executor[i % executor.size()]}); | |
barrier.wait(); | |
ios.run(); | |
barrier.wait(); | |
}); | |
} | |
{ | |
std::cout << "running..." << std::endl; | |
barrier.wait(); | |
boost::timer::auto_cpu_timer timer{}; | |
barrier.wait(); | |
} | |
for (auto&& t : threads) { | |
t.join(); | |
} | |
std::cout << "the number of invoked handler: " << counter << std::endl; | |
for (auto i = std::size_t{0}; i < nthread; ++i) { | |
std::cout << "thread " << i << ": " << ::num_invoked[i] << std::endl; | |
} | |
} | |
int main(int argc, char* argv[]) | |
{ | |
try { | |
namespace popts = boost::program_options; | |
popts::options_description desc{"performe io_service"}; | |
desc.add_options() | |
("help,h", "display help message") | |
("nio_service,i", popts::value<std::size_t>()->default_value(1), "the number of io_services") | |
("nthread,t", popts::value<std::size_t>()->default_value(1), "the number of threads") | |
("nwork,w", popts::value<std::size_t>(&nwork)->default_value(10000), "the number of works that producers post to io_services") | |
("npost,p", popts::value<std::size_t>(&npost)->default_value(1), "the number of posts per work") | |
("nloop,l", popts::value<std::size_t>(&nloop_max)->default_value(1000), "max loop count per work") | |
("strand,s", "use strand as executor") | |
("mutex,m", "use mutex instead of atomic counter") | |
("wrap,r", "use strand::wrap instead of atomic counter") | |
; | |
auto vm = popts::variables_map{}; | |
popts::store(popts::parse_command_line(argc, argv, desc), vm); | |
popts::notify(vm); | |
if (vm.count("help")) { | |
std::cout << desc << std::endl; | |
return 0; | |
} | |
auto const nio_service = vm["nio_service"].as<std::size_t>(); | |
auto const nthread | |
= std::max(vm["nthread"].as<std::size_t>(), nio_service); | |
if (!vm.count("strand") && !vm.count("mutex") && !vm.count("wrap")) { | |
std::cout << "use " << nio_service << " io_service(s) with atomic counter" << std::endl; | |
auto io_service = std::vector<asio::io_service>(nio_service); | |
run<io_service_post_work>(io_service, io_service, nthread, atomic_counter); | |
} | |
if (vm.count("strand")) { | |
std::cout << "use " << nio_service << " strand(s) with strad::post" << std::endl; | |
auto io_service = std::vector<asio::io_service>(1); | |
auto strand = std::vector<asio::io_service::strand>{}; | |
strand.reserve(nio_service); | |
for (auto i = std::size_t{}; i < nio_service; ++i) { | |
strand.emplace_back(io_service[0]); | |
} | |
run<strand_wrapper<strand_post_work>>(io_service, strand, nthread, counter); | |
} | |
if (vm.count("mutex")) { | |
std::cout << "use " << nio_service << " io_service(s) with mutex" << std::endl; | |
auto io_service = std::vector<asio::io_service>(nio_service); | |
run<mutex_io_servcie_post_work>(io_service, io_service, nthread, counter); | |
} | |
if (vm.count("wrap")) { | |
std::cout << "use " << nio_service << " strand(s) with strand::wrap" << std::endl; | |
auto io_service = std::vector<asio::io_service>(1); | |
auto strand = std::vector<asio::io_service::strand>{}; | |
strand.reserve(nio_service); | |
for (auto i = std::size_t{}; i < nio_service; ++i) { | |
strand.emplace_back(io_service[0]); | |
} | |
run<strand_wrapper<strand_wrap_work>>(io_service, strand, nthread, counter); | |
} | |
} | |
catch (std::exception const& e) { | |
std::cerr << e.what() << std::endl; | |
} | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment