Skip to content

Instantly share code, notes, and snippets.

@simmplecoder
Created July 21, 2017 11:49
Show Gist options
  • Save simmplecoder/8c93858bdc12d45666a2eebc1ca01a79 to your computer and use it in GitHub Desktop.
Save simmplecoder/8c93858bdc12d45666a2eebc1ca01a79 to your computer and use it in GitHub Desktop.
async accumulate code
#include <iostream>
#include <thread>
#include <future>
#include <iterator>
#include <algorithm>
#include <vector>
#include <random>
#include <sstream>
#include <chrono>
#include <iomanip>
unsigned int get_core_count()
{
unsigned int core_count = std::thread::hardware_concurrency();
return core_count == 0 ? 2 : core_count; //4 is usual core count on average machine
}
template <typename RandomAcessIt, typename T>
T accumulate(RandomAcessIt first, RandomAcessIt last, T initvalue)
{
constexpr std::size_t bucket_size = 10'000;
static unsigned int core_count = get_core_count();
std::size_t iteration_size = bucket_size * core_count;
auto distance = std::distance(first, last);
std::size_t iteration_count = distance / (iteration_size);
std::vector<std::future<T>> futures;
futures.resize(core_count);
for (std::size_t i = 0; i < iteration_count; ++i)
{
for (std::size_t j = 0; j < core_count; ++j)
{
const auto core_first = first +
i * iteration_size +
j * bucket_size;
auto std_accumulate = [core_first, bucket_size]() //take by copy since it will get out of scope
{
return std::accumulate(core_first, core_first + bucket_size, T{});
};
futures[j] = std::move(std::async(std::launch::async,
std_accumulate
));
// std::launch::async,
// std::accumulate,
// core_first,
// core_first + bucket_size,
// T{}
}
for (std::size_t j = 0; j < core_count; ++j)
{
initvalue += futures[j].get();
}
}
if (distance > iteration_count * iteration_size)
{
initvalue += std::accumulate(first + iteration_count * iteration_size,
last,
T{});
}
return initvalue;
}
template <typename Clock, typename Duration>
std::ostream& operator<<(std::ostream& os, const std::chrono::time_point<Clock, Duration>& timep)
{
auto converted_timep = Clock::to_time_t(timep);
os << std::put_time(std::localtime(&converted_timep), "%Y %b %d %H:%M:%S");
return os;
}
int main() {
std::vector<unsigned long long> v;
std::default_random_engine engine;
std::uniform_int_distribution<unsigned int> distr(0, 255);
v.resize(100'000'000);
for (auto& num: v)
{
num = distr(engine);
}
auto st_start = std::chrono::steady_clock::now();
std::cout << "single threaded accumulate started at: "
<< std::chrono::system_clock::now() << '\n' << std::flush;
auto st_result = std::accumulate(v.begin(), v.end(), 0ull);
auto st_end = std::chrono::steady_clock::now();
std::cout << "single threaded accumulate finished at: "
<< std::chrono::system_clock::now() << '\n' << std::flush;
std::cout << "It took "
<< std::chrono::duration_cast<std::chrono::microseconds>(st_end - st_start).count()
<< " microseconds\n" << std::flush;
auto mt_start = std::chrono::steady_clock::now();
std::cout << "multi threaded accumulate started at: "
<< std::chrono::system_clock::now() << '\n' << std::flush;
auto mt_result = ::accumulate(v.begin(), v.end(), 0ull); //haha, not this time, ADL!
auto mt_end = std::chrono::steady_clock::now();
std::cout << "multi threaded accumulate finished at: "
<< std::chrono::system_clock::now() << '\n' << std::flush;
std::cout << "It took "
<< std::chrono::duration_cast<std::chrono::microseconds>(mt_end - mt_start).count()
<< " microseconds\n";
if (st_result != mt_result)
{
std::stringstream ss;
ss << "single and multi threaded accumulate does not agree\n"
<< "single thread result is " << st_result << '\n'
<< "while multi thread result is " << mt_result << '\n';
throw std::logic_error(ss.str());
}
return 0;
}
cmake_minimum_required(VERSION 3.8)
project(untitled)
set(CMAKE_CXX_STANDARD 14)
set(SOURCE_FILES main.cpp)
add_executable(untitled ${SOURCE_FILES})
target_link_libraries(untitled pthread)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment