Created
November 18, 2017 02:46
-
-
Save BillyONeal/5a767926c0184cafc6b089f3464fb688 to your computer and use it in GitHub Desktop.
Parallel partition O(n)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// PARALLEL FUNCTION TEMPLATE partition | |
template<class _FwdIt, | |
class _Pr> | |
pair<_FwdIt, _Iter_diff_t<_FwdIt>> | |
_Partition_swap_backward(_FwdIt _First, const _FwdIt _Last, _FwdIt _Beginning_of_falses, _Pr _Pred) | |
{ // swap elements in [_First, _Last) satisfying _Pred with elements from _Beginning_of_falses | |
// pre: _Beginning_of_falses < _First | |
_Iter_diff_t<_FwdIt> _Trues{}; | |
for (; _First != _Last; ++_First) | |
{ | |
if (_Pred(*_First)) | |
{ | |
_STD iter_swap(_First, _Beginning_of_falses); | |
++_Trues; | |
++_Beginning_of_falses; | |
} | |
} | |
return {_Beginning_of_falses, _Trues}; | |
} | |
template<class _FwdIt, | |
class _Pr> | |
struct _Static_partitioned_partition | |
{ | |
using _Diff = _Iter_diff_t<_FwdIt>; | |
enum class _Chunk_state : unsigned char | |
{ | |
_Serial, // while a chunk is in the serial state, it is touched only by an owner thread | |
_Merging, // while a chunk is in the merging state, threads all try to CAS the chunk _Merging -> _Swapping | |
// the thread that succeeds takes responsibility of swapping the trues from that chunk to the | |
// results | |
_Swapping, // while a chunk is in the swapping state, the trues are being merged with _Results | |
// only one chunk at a time is ever _Swapping; this also serves to synchronize access to | |
// _Results and _Results_falses | |
_Done // when a chunk becomes _Done, it is complete / will never need to touch _Results again | |
}; | |
#pragma warning(push) | |
#pragma warning(disable: 4324) // structure was padded due to alignment specifier | |
struct alignas(hardware_destructive_interference_size) alignas(_FwdIt) _Chunk_local_data | |
{ | |
atomic<_Chunk_state> _State; | |
_FwdIt _Beginning_of_falses; | |
_Diff _Chunk_trues; | |
}; | |
#pragma warning(pop) | |
_Static_partition_team<_Diff> _Team; | |
_Static_partition_range<_FwdIt> _Basis; | |
_Pr _Pred; | |
_Parallel_vector<_Chunk_local_data> _Chunk_locals; | |
_FwdIt _Results; | |
_Diff _Results_falses; | |
_Static_partitioned_partition(const size_t _Hw_threads, const _Diff _Count, | |
const _FwdIt _First, const _Pr _Pred_) | |
: _Team{_Count, _Get_chunked_work_chunk_count(_Hw_threads, _Count)}, | |
_Basis{}, | |
_Pred{_Pred_}, | |
_Chunk_locals(_Team._Chunks), | |
_Results{_First}, | |
_Results_falses{} | |
{ | |
_Basis._Populate(_Team, _First); | |
} | |
_Cancellation_status _Process_chunk() | |
{ | |
const auto _Key = _Team._Get_next_key(); | |
if (!_Key) | |
{ | |
return (_Cancellation_status::_Canceled); | |
} | |
// serial-on-each-chunk phase: | |
auto _Merge_index = _Key._Chunk_number; // merge step will start from this index | |
{ | |
auto& _This_chunk_data = _Chunk_locals[_Merge_index]; | |
const auto _Range = _Basis._Get_chunk(_Key); | |
if (_Merge_index == 0 || _Chunk_locals[_Merge_index - 1]._State.load() == _Chunk_state::_Done) | |
{ // no predecessor, so run serial algorithm directly into results | |
if (_Merge_index == 0 || _Results == _Range._First) | |
{ | |
_Results = _Partition_unchecked(_Range._First, _Range._Last, _Pred, _Iter_cat_t<_FwdIt>()); | |
_This_chunk_data._Chunk_trues = _STD distance(_Range._First, _Results); // FIXME | |
} | |
else | |
{ | |
auto _This_chunk_results = _Partition_swap_backward(_Range._First, _Range._Last, _Results, _Pred); | |
_Results = _This_chunk_results.first; | |
_This_chunk_data._Chunk_trues = _This_chunk_results.second; | |
} | |
_Results_falses += _Key._Size - _This_chunk_data._Chunk_trues; | |
_This_chunk_data._State.store(_Chunk_state::_Done); | |
++_Merge_index; // this chunk is already merged | |
} | |
else | |
{ // predecessor, run serial algorithm in place and attempt to merge later | |
_This_chunk_data._Beginning_of_falses = | |
_Partition_unchecked(_Range._First, _Range._Last, _Pred, _Iter_cat_t<_FwdIt>()); | |
_This_chunk_data._Chunk_trues = _STD distance(_Range._First, _This_chunk_data._Beginning_of_falses); // FIXME | |
_This_chunk_data._State.store(_Chunk_state::_Merging); | |
if (_Chunk_locals[_Merge_index - 1]._State.load() != _Chunk_state::_Done) | |
{ // if the predecessor isn't done, whichever thread merges our predecessor will merge us too | |
return (_Cancellation_status::_Running); | |
} | |
} | |
} | |
// merge phase: at this point, we have observed that our predecessor chunk has been merged to the output, | |
// attempt to become the new merging thread if the previous merger gave up | |
// note: it is an invariant when we get here that _Chunk_locals[_Merge_index - 1]._State == _Chunk_state::_Done | |
for (; _Merge_index != _Team._Chunks; ++_Merge_index) | |
{ | |
auto& _Merge_chunk_data = _Chunk_locals[_Merge_index]; | |
auto _Expected = _Chunk_state::_Merging; | |
if (!_Merge_chunk_data._State.compare_exchange_strong(_Expected, _Chunk_state::_Swapping)) | |
{ // either the _Merge_index chunk isn't ready to merge yet, or another thread will do it | |
return (_Cancellation_status::_Running); | |
} | |
const auto _Merge_key = _Team._Get_chunk_key(_Merge_index); | |
const auto _Merge_first = _Basis._Get_first(_Merge_index, _Merge_key._Start_at); | |
const auto _Merge_beginning_of_falses = _STD exchange(_Merge_chunk_data._Beginning_of_falses, {}); | |
// at this point: | |
// [_Results, _Merge_first) is the region of falses that are already merged | |
// [_Merge_first, _Merge_beginning_of_falses) is the region of trues in the chunk to merge | |
// [_Merge_beginning_of_falses, merging chunk's _Last) is the region of falses in the chunk to merge | |
const auto _Chunk_trues = _Merge_chunk_data._Chunk_trues; | |
if (_Results_falses < _Chunk_trues) | |
{ // exchange entire merged-falses range with the end of the chunk's true range | |
const auto _Merged_falses_first = _Results; | |
if constexpr (_Is_bidit_v<_FwdIt>) | |
{ | |
_Results = _STD prev(_Merge_beginning_of_falses, _Results_falses); | |
} | |
else | |
{ | |
_Results = _STD next(_Merge_first, _Chunk_trues - _Results_falses); | |
} | |
_Swap_ranges_unchecked(_Merged_falses_first, _Merge_first, _Results); | |
} | |
else | |
{ // exchange entire chunk-trues range with beginning of merged falses range | |
_Results = _Swap_ranges_unchecked(_Merge_first, _Merge_beginning_of_falses, _Results); | |
} | |
_Results_falses += _Merge_key._Size - _Chunk_trues; | |
_Merge_chunk_data._State.store(_Chunk_state::_Done); | |
} | |
return (_Cancellation_status::_Canceled); | |
} | |
static void __stdcall _Threadpool_callback(__std_PTP_CALLBACK_INSTANCE, void * const _Context, | |
__std_PTP_WORK) _NOEXCEPT // enforces termination | |
{ | |
(void)static_cast<_Static_partitioned_partition *>(_Context)->_Process_chunk(); | |
} | |
}; | |
template<class _FwdIt, | |
class _Pr> inline | |
_FwdIt _Partition_unchecked(_Sequenced_policy_tag, _FwdIt _First, _FwdIt _Last, _Pr _Pred) | |
{ // move elements satisfying _Pred to beginning of sequence, serially | |
return (_Partition_unchecked(_First, _Last, _Pred, _Iter_cat_t<_FwdIt>())); | |
} | |
template<class _FwdIt, | |
class _Pr> inline | |
_FwdIt _Partition_unchecked(_Parallel_policy_tag, _FwdIt _First, _FwdIt _Last, _Pr _Pred) | |
{ // move elements satisfying _Pred to beginning of sequence, in parallel | |
const size_t _Hw_threads = __std_parallel_algorithms_hw_threads(); | |
if (_Hw_threads > 1) | |
{ | |
const auto _Count = _STD distance(_First, _Last); | |
if (_Count >= 2) | |
{ | |
_TRY_BEGIN | |
_Static_partitioned_partition<_FwdIt, _Pr> _Operation{_Hw_threads, _Count, _First,_Pred}; | |
_Run_chunked_parallel_work(_Operation); | |
return (_Operation._Results); | |
_CATCH(const _Parallelism_resources_exhausted&) | |
// fall through to serial case below | |
_CATCH_END | |
} | |
} | |
return (_Partition_unchecked(_First, _Last, _Pred, _Iter_cat_t<_FwdIt>())); | |
} | |
template<class _ExPo, | |
class _FwdIt, | |
class _Pr, | |
_Enable_if_execution_policy_t<_ExPo>/* = 0 */> inline | |
_FwdIt partition(_ExPo&& _Exec, _FwdIt _First, const _FwdIt _Last, _Pr _Pred) _NOEXCEPT | |
{ // move elements satisfying _Pred to beginning of sequence | |
_DEBUG_RANGE(_First, _Last); | |
return (_Rechecked(_First, | |
_Partition_unchecked(_STD forward<_ExPo>(_Exec), _Unchecked(_First), _Unchecked(_Last), _Pass_fn(_Pred)))); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment