Last active
November 17, 2017 10:54
-
-
Save BillyONeal/c54acb13a5ec22dd977ac6fa8a159ce7 to your computer and use it in GitHub Desktop.
Parallel remove_if
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// PARALLEL FUNCTION TEMPLATES remove AND remove_if | |
template<class _InIt, | |
class _OutIt, | |
class _Pr> | |
_OutIt _Remove_move_if_unchecked(_InIt _First, const _InIt _Last, _OutIt _Dest, _Pr _Pred) | |
{ // move omitting each element satisfying _Pred | |
for (; _First != _Last; ++_First) | |
{ | |
if (!_Pred(*_First)) | |
{ | |
*_Dest = _STD move(*_First); | |
++_Dest; | |
} | |
} | |
return (_Dest); | |
} | |
template<class _FwdIt, | |
class _Pr> | |
struct _Static_partitioned_remove_if | |
{ | |
enum class _Chunk_state : unsigned char | |
{ | |
_Serial, // while a chunk is in the serial state, it is touched only by an owner thread | |
_Merging, // while a chunk is in the merging state, threads all try to CAS the chunk _Merging -> _Moving | |
// the thread that succeeds takes responsibility of moving the keepers from that chunk to the | |
// results | |
_Moving, // while a chunk is in the moving state, the keepers are being moved to _Results | |
// only one chunk at a time is ever _Moving; this also serves to synchronize access to _Results | |
_Done // when a chunk becomes _Done, it is complete / will never need to touch _Results again | |
// as an optimization, if a thread sees that it has no predecessor (or its predecessor is _Done), it | |
// may transition from _Serial directly to _Done, doing the moving step implicitly. | |
}; | |
#pragma warning(push) | |
#pragma warning(disable: 4324) // structure was padded due to alignment specifier | |
struct alignas(hardware_destructive_interference_size) alignas(_FwdIt) _Chunk_local_data | |
{ | |
atomic<_Chunk_state> _State; | |
_FwdIt _New_end; | |
}; | |
#pragma warning(pop) | |
_Static_partition_team<_Iter_diff_t<_FwdIt>> _Team; | |
_Static_partition_range<_FwdIt> _Basis; | |
_Pr _Pred; | |
_Parallel_vector<_Chunk_local_data> _Chunk_locals; | |
_FwdIt _Results; | |
_Static_partitioned_remove_if(const size_t _Hw_threads, const _Iter_diff_t<_FwdIt> _Count, | |
const _FwdIt _First, const _Pr _Pred_) | |
: _Team{_Count, _Get_chunked_work_chunk_count(_Hw_threads, _Count)}, | |
_Basis{}, | |
_Pred{_Pred_}, | |
_Chunk_locals(_Team._Chunks), | |
_Results{_First} | |
{ | |
_Basis._Populate(_Team, _First); | |
} | |
_Cancellation_status _Process_chunk() | |
{ | |
const auto _Key = _Team._Get_next_key(); | |
if (!_Key) | |
{ | |
return (_Cancellation_status::_Canceled); | |
} | |
// remove phase: | |
auto _Merge_index = _Key._Chunk_number; // merge step will start from this index | |
{ | |
auto& _This_chunk_data = _Chunk_locals[_Merge_index]; | |
const auto _Range = _Basis._Get_chunk(_Key); | |
if (_Merge_index == 0 || _Chunk_locals[_Merge_index - 1]._State.load() == _Chunk_state::_Done) | |
{ // no predecessor, so run serial algorithm directly into results | |
if (_Merge_index == 0 || _Results == _Range._First) | |
{ | |
_Results = _STD remove_if(_Range._First, _Range._Last, _Pred); | |
} | |
else | |
{ | |
_Results = _Remove_move_if_unchecked(_Range._First, _Range._Last, _Results, _Pred); | |
} | |
_This_chunk_data._State.store(_Chunk_state::_Done); | |
++_Merge_index; // this chunk is already merged | |
} | |
else | |
{ // predecessor, run serial algorithm in place and attempt to merge later | |
_This_chunk_data._New_end = _STD remove_if(_Range._First, _Range._Last, _Pred); | |
_This_chunk_data._State.store(_Chunk_state::_Merging); | |
if (_Chunk_locals[_Merge_index - 1]._State.load() != _Chunk_state::_Done) | |
{ // if the predecessor isn't done, whichever thread merges our predecessor will merge us too | |
return (_Cancellation_status::_Running); | |
} | |
} | |
} | |
// merge phase: at this point, we have observed that our predecessor chunk has been merged to the output, | |
// attempt to become the new merging thread if the previous merger gave up | |
// note: it is an invariant when we get here that _Chunk_locals[_Merge_index - 1]._State == _Chunk_state::_Done | |
for (; _Merge_index != _Team._Chunks; ++_Merge_index) | |
{ | |
auto& _Merge_chunk_data = _Chunk_locals[_Merge_index]; | |
auto _Expected = _Chunk_state::_Merging; | |
if (!_Merge_chunk_data._State.compare_exchange_strong(_Expected, _Chunk_state::_Moving)) | |
{ // either the _Merge_index chunk isn't ready to merge yet, or another thread will do it | |
return (_Cancellation_status::_Running); | |
} | |
const auto _Merge_first = _Basis._Get_first(_Merge_index, _Team._Get_chunk_offset(_Merge_index)); | |
const auto _Merge_new_end = _STD exchange(_Merge_chunk_data._New_end, {}); | |
if (_Results == _Merge_first) | |
{ // entire range up to now had no removals, don't bother moving | |
_Results = _Merge_new_end; | |
} | |
else | |
{ | |
_Results = _Move_unchecked(_Merge_first, _Merge_new_end, _Results); | |
} | |
_Merge_chunk_data._State.store(_Chunk_state::_Done); | |
} | |
return (_Cancellation_status::_Canceled); | |
} | |
static void __stdcall _Threadpool_callback(__std_PTP_CALLBACK_INSTANCE, void * const _Context, | |
__std_PTP_WORK) _NOEXCEPT // enforces termination | |
{ | |
(void)static_cast<_Static_partitioned_remove_if *>(_Context)->_Process_chunk(); | |
} | |
}; | |
template<class _FwdIt, | |
class _Pr> inline | |
_FwdIt _Remove_if_unchecked(_Sequenced_policy_tag, const _FwdIt _First, const _FwdIt _Last, _Pr _Pred) | |
{ // remove each satisfying _Pred, serially | |
return (_STD remove_if(_First, _Last, _Pred)); | |
} | |
template<class _FwdIt, | |
class _Pr> inline | |
_FwdIt _Remove_if_unchecked(_Parallel_policy_tag, const _FwdIt _First, const _FwdIt _Last, _Pr _Pred) | |
{ // remove each satisfying _Pred, in parallel | |
const size_t _Hw_threads = __std_parallel_algorithms_hw_threads(); | |
if (_Hw_threads > 1) | |
{ | |
const auto _Count = _STD distance(_First, _Last); | |
if (_Count >= 2) | |
{ | |
_TRY_BEGIN | |
_Static_partitioned_remove_if<_FwdIt, _Pr> _Operation{_Hw_threads, _Count, _First,_Pred}; | |
_Run_chunked_parallel_work(_Operation); | |
return (_Operation._Results); | |
_CATCH(const _Parallelism_resources_exhausted&) | |
// fall through to serial case below | |
_CATCH_END | |
} | |
} | |
return (_STD remove_if(_First, _Last, _Pred)); | |
} | |
template<class _ExPo, | |
class _FwdIt, | |
class _Pr, | |
_Enable_if_execution_policy_t<_ExPo>/* = 0 */> | |
_NODISCARD inline _FwdIt remove_if(_ExPo&& _Exec, _FwdIt _First, const _FwdIt _Last, _Pr _Pred) _NOEXCEPT | |
{ // remove each satisfying _Pred | |
_DEBUG_RANGE(_First, _Last); | |
return (_Rechecked(_First, | |
_Remove_if_unchecked(_STD forward<_ExPo>(_Exec), _Unchecked(_First), _Unchecked(_Last), _Pass_fn(_Pred)))); | |
} | |
template<class _ExPo, | |
class _FwdIt, | |
class _Ty, | |
_Enable_if_execution_policy_t<_ExPo>/* = 0 */> | |
_NODISCARD inline _FwdIt remove(_ExPo&& _Exec, const _FwdIt _First, const _FwdIt _Last, const _Ty& _Val) _NOEXCEPT | |
{ // remove each matching _Val | |
return (_STD remove_if(_STD forward<_ExPo>(_Exec), _First, _Last, [&_Val](auto&& _Lhs) { | |
return (_STD forward<decltype(_Lhs)>(_Lhs) == _Val); | |
})); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment