Skip to content

Instantly share code, notes, and snippets.

@BillyONeal
Created August 24, 2017 17:28
Show Gist options
  • Save BillyONeal/d9196f4c51fbe2ce37f9a9534458f40c to your computer and use it in GitHub Desktop.
Save BillyONeal/d9196f4c51fbe2ce37f9a9534458f40c to your computer and use it in GitHub Desktop.
// STRUCT TEMPLATE _Static_partition_set
template<class _FwdIt>
struct _Iterator_range
{ // record of a partition of work
_FwdIt _First;
_FwdIt _Last;
explicit operator bool() const
{
return (_First != _Last);
}
};
template<class _FwdIt,
bool = _Is_ranit_v<_FwdIt>>
struct _Static_partition_set;
template<class _RanIt>
struct _Static_partition_set<_RanIt, true>
{ // partition set for random-access iterators
size_t _Max_chunk;
atomic<size_t> _Consumed_chunks;
_RanIt _Start_at;
size_t _Chunk_size;
size_t _Unchunked_items;
_Static_partition_set(_RanIt& _First, const size_t _Count, const size_t _Chunks)
: _Max_chunk(_Chunks),
_Consumed_chunks(0),
_Start_at(_First),
_Chunk_size(_Count / _Chunks),
_Unchunked_items(_Count % _Chunks)
{ // statically partition a random-access iterator range
// pre: _Count >= _Chunks
// pre: _Chunks >= 1
_First += _Count;
}
_Iterator_range<_RanIt> _Try_get_chunk()
{ // retrieves the next static partition to process, if it exists;
// otherwise, retrieves a static partition with an empty iterator range
const auto _This_chunk = _Consumed_chunks++;
if (_This_chunk < _Max_chunk)
{
const auto _Offsets = _Static_partition_offset<_Iter_diff_t<_RanIt>>(
_This_chunk, _Chunk_size, _Unchunked_items);
const auto _First = _Start_at + _Offsets.first;
return {_First, _First + _Offsets.second};
}
return {};
}
};
template<class _FwdIt>
struct _Static_partition_set<_FwdIt, false>
{ // partition set for forward iterators
_Parallel_vector<_Unchecked_t<_FwdIt>> _Division_points;
atomic<size_t> _Consumed_chunks;
_Static_partition_set(_FwdIt& _First, const size_t _Count, const size_t _Chunks)
: _Division_points(_Do_partition(_First, _Count, _Chunks)),
_Consumed_chunks(0)
{ // statically partition a forward iterator range
// pre: _Count >= _Chunks
// pre: _Chunks >= 1
}
_Iterator_range<_Unchecked_t<_FwdIt>> _Try_get_chunk()
{ // retrieves the next static partition to process, if it exists;
// otherwise, retrieves a static partition with an empty iterator range
const auto _This_chunk = _Consumed_chunks++;
if (_This_chunk < _Division_points.size() - 1)
{
return {_Division_points[_This_chunk], _Division_points[_This_chunk + 1]};
}
return {};
}
private:
static _Parallel_vector<_Unchecked_t<_FwdIt>> _Do_partition(_FwdIt& _First, const size_t _Count, const size_t _Chunks)
{ // statically partition a forward iterator range
// pre: _Count >= _Chunks
// pre: _Chunks >= 1
const size_t _Chunk_size = _Count / _Chunks;
const size_t _Unchunked_items = _Count % _Chunks;
_Parallel_vector<_Unchecked_t<_FwdIt>> _Results(_Chunks + 1);
auto _Result = _Results.begin();
*_Result = _Unchecked(_First);
for (size_t _Idx = 0; _Idx < _Unchunked_items; ++_Idx)
{ // record bounds of chunks with an extra item
_STD advance(_First, _Chunk_size + 1);
*++_Result = _Unchecked(_First);
}
for (size_t _Idx = _Unchunked_items; _Idx < _Chunks; ++_Idx)
{ // record bounds of chunks with no extra item
_STD advance(_First, _Chunk_size);
*++_Result = _Unchecked(_First);
}
return (_Results);
}
};
template<class _Ty>
struct _Generalized_sum_drop
{ // drop off point for GENERALIZED_SUM intermediate results
_Ty * _Data;
_STD atomic<size_t> _Frontier;
explicit _Generalized_sum_drop(const size_t _Slots)
: _Data(static_cast<_Ty *>(_Allocate<_Parallelism_allocate_traits>(_Slots, sizeof(_Ty)))),
_Frontier(0)
{
}
~_Generalized_sum_drop() _NOEXCEPT
{
_Destroy_range(_Data, _Data + _Frontier.load(memory_order_relaxed));
}
template<class... _Args>
void _Add_result(_Args&&... _Vals) _NOEXCEPT // enforces termination
{ // constructs a _Ty in place with _Vals parameters perfectly forwarded
// pre: the number of results added is less than the size the drop was constructed with
const size_t _Target = _Frontier++;
_Construct_in_place(_Data[_Target], _STD forward<_Args>(_Vals)...);
}
_Ty * begin()
{
return (_Data);
}
_Ty * end()
{
return (_Data + _Frontier.load(memory_order_relaxed));
}
};
// PARALLEL FUNCTION TEMPLATE reduce
template<class _FwdIt,
class _Ty,
class _Fn>
struct _Static_partitioned_reduce
{ // reduction task scheduled on the system thread pool
_Static_partition_set<_FwdIt> _Partitions;
_Fn _Reduction;
_Generalized_sum_drop<_Ty> _Results;
_Static_partitioned_reduce(_FwdIt& _First_par, const size_t _Background_work,
const size_t _Background_chunks, _Fn _Reduction)
: _Partitions{_First_par, _Background_work, _Background_chunks},
_Reduction(_Reduction),
_Results{_Background_chunks}
{
}
static void __stdcall _Threadpool_callback(__std_PTP_CALLBACK_INSTANCE, void * _Context,
__std_PTP_WORK) _NOEXCEPT // enforces termination
{ // callback from the system thread pool to do work
const auto _This = static_cast<_Static_partitioned_reduce *>(_Context);
auto _This_chunk = _This->_Partitions._Try_get_chunk();
if (_This_chunk)
{
auto& _First = _This_chunk._First;
auto& _Last = _This_chunk._Last;
const _FwdIt _Val{_First};
_Ty _Acc{_This->_Reduction(*_Val, *++_First)};
_This->_Results._Add_result(_Reduce_unchecked(_First, _Last, _STD move(_Acc), _This->_Reduction));
}
}
};
template<class _FwdIt,
class _Ty,
class _Fn>
_Ty _Reduce_unchecked(_Sequenced_policy_tag, _FwdIt _First, _FwdIt _Last, _Ty _Val, _Fn _Reduction)
{ // return reduction, serial
return (_Reduce_unchecked(_First, _Last, _STD move(_Val), _Reduction));
}
#pragma float_control(precise, off, push)
template<class _FwdIt,
class _Ty,
class _Fn>
_Ty _Reduce_unchecked(_Parallel_policy_tag, _FwdIt _First, _FwdIt _Last, _Ty _Val, _Fn _Reduction)
{ // return reduction, parallelized
if (_First == _Last)
{
return (_Val);
}
const size_t _Hw_threads = __std_parallel_algorithms_hw_threads();
if (_Hw_threads > 1)
{ // parallelize on multiprocessor machines...
const size_t _Count = _STD distance(_First, _Last);
// parallel reduction chunks require at least 2 elements to form the first accumlator for each chunk,
// except for the foreground chunk which only requires 1 element (and _Val)
const size_t _Max_background_chunks = (_Count - 1) >> 1; // TRANSITION, VSO#433486
const size_t _Bg_threads = _Min_value(_Hw_threads - 1, _Max_background_chunks);
if (_Bg_threads != 0)
{
const size_t _Foreground_work = _Count / _Hw_threads;
const size_t _Background_work = _Count - _Foreground_work;
_FwdIt _First_par = _First;
_TRY_BEGIN
_Static_partitioned_reduce<_FwdIt, _Ty, _Fn> _Operation{
_First_par, _Background_work, _Bg_threads, _Reduction };
const _Work_ptr _Work{_Operation};
_Work._Submit(_Bg_threads);
_Val = _Reduce_unchecked(_First_par, _Last, _STD move(_Val), _Reduction);
while (auto _Stolen_chunk = _Operation._Partitions._Try_get_chunk())
{ // keep processing remaining chunks to comply with N4687 [intro.progress]/14
_Val = _Reduce_unchecked(_Stolen_chunk._First, _Stolen_chunk._Last, _STD move(_Val), _Reduction);
}
_Work._Cancel();
auto& _Results = _Operation._Results;
return (_Reduce_unchecked(_Results.begin(), _Results.end(), _STD move(_Val), _Reduction));
_CATCH(const _Parallelism_resources_exhausted&)
// fall through to serial case below
_CATCH_END
}
}
return (_Reduce_unchecked(_First, _Last, _STD move(_Val), _Reduction));
}
N is 100000000
Testing accumulate
sum is is 214748846230582752.000000
std::accumulate took 82ms
Testing reduce
sum is is 214748846230426464.000000
std::reduce took 32ms
Testing reduce_par
sum is is 214748913510961984.000000
std::reduce(par took 22ms
Speedup acc -> reduce: 2.552153x
Speedup acc -> reduce(par: 3.667796x
Speedup reduce -> reduce(par: 1.437138x
#include <math.h>
#include <stdio.h>
#include <chrono>
#include <execution>
#include <iterator>
#include <random>
#include "stopwatch.hpp"
using namespace std;
using namespace std::chrono;
using namespace std::execution;
double a[100'000'000];
#pragma optimize("", off)
int main() {
mt19937 gen(1729);
generate(begin(a), end(a), [&] { return static_cast<double>(gen()); });
stopwatch acc;
stopwatch red;
stopwatch red_par;
printf("N is %zu\n", static_cast<size_t>(end(a) - begin(a)));
double result;
auto pred = plus<>{}; // [] (double x, double y) { return sin(x + y) + cos(x - y); };
puts("Testing accumulate");
acc.start();
result = accumulate(begin(a), end(a), 0.0, pred);
acc.stop();
printf("sum is is %f\n", result);
acc.print("std::accumulate");
puts("Testing reduce");
red.start();
result = reduce(begin(a), end(a), 0.0, pred);
red.stop();
printf("sum is is %f\n", result);
red.print("std::reduce");
puts("Testing reduce_par");
red_par.start();
result = reduce(par, begin(a), end(a), 0.0, pred);
red_par.stop();
printf("sum is is %f\n", result);
red_par.print("std::reduce(par");
printf("Speedup acc -> reduce: %fx\n", red.compare(acc));
printf("Speedup acc -> reduce(par: %fx\n", red_par.compare(acc));
printf("Speedup reduce -> reduce(par: %fx\n", red_par.compare(red));
}
#pragma optimize("", on)
#pragma once
#include <stdio.h>
#include <chrono>
class stopwatch {
std::chrono::high_resolution_clock::time_point startTime;
std::chrono::high_resolution_clock::time_point stopTime;
public:
stopwatch()
: startTime()
, stopTime()
{ }
stopwatch(const stopwatch&) = delete;
stopwatch& operator=(const stopwatch&) = delete;
void start() {
startTime = std::chrono::high_resolution_clock::now();
}
void stop() {
stopTime = std::chrono::high_resolution_clock::now();
}
std::chrono::high_resolution_clock::duration time_taken() const {
return stopTime - startTime;
}
double compare(const stopwatch& s) {
return static_cast<double>(s.time_taken().count()) / time_taken().count();
}
void print(const char * const action) const {
printf("%s took %lldms\n", action,
static_cast<long long>(std::chrono::duration_cast<std::chrono::milliseconds>(time_taken()).count()));
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment