Skip to content

Instantly share code, notes, and snippets.

@BillyONeal
Last active November 17, 2017 10:54
Show Gist options
  • Save BillyONeal/c54acb13a5ec22dd977ac6fa8a159ce7 to your computer and use it in GitHub Desktop.
Save BillyONeal/c54acb13a5ec22dd977ac6fa8a159ce7 to your computer and use it in GitHub Desktop.
Parallel remove_if
// PARALLEL FUNCTION TEMPLATES remove AND remove_if
template<class _InIt,
class _OutIt,
class _Pr>
_OutIt _Remove_move_if_unchecked(_InIt _First, const _InIt _Last, _OutIt _Dest, _Pr _Pred)
{ // move omitting each element satisfying _Pred
for (; _First != _Last; ++_First)
{
if (!_Pred(*_First))
{
*_Dest = _STD move(*_First);
++_Dest;
}
}
return (_Dest);
}
template<class _FwdIt,
class _Pr>
struct _Static_partitioned_remove_if
{
enum class _Chunk_state : unsigned char
{
_Serial, // while a chunk is in the serial state, it is touched only by an owner thread
_Merging, // while a chunk is in the merging state, threads all try to CAS the chunk _Merging -> _Moving
// the thread that succeeds takes responsibility of moving the keepers from that chunk to the
// results
_Moving, // while a chunk is in the moving state, the keepers are being moved to _Results
// only one chunk at a time is ever _Moving; this also serves to synchronize access to _Results
_Done // when a chunk becomes _Done, it is complete / will never need to touch _Results again
// as an optimization, if a thread sees that it has no predecessor (or its predecessor is _Done), it
// may transition from _Serial directly to _Done, doing the moving step implicitly.
};
#pragma warning(push)
#pragma warning(disable: 4324) // structure was padded due to alignment specifier
struct alignas(hardware_destructive_interference_size) alignas(_FwdIt) _Chunk_local_data
{
atomic<_Chunk_state> _State;
_FwdIt _New_end;
};
#pragma warning(pop)
_Static_partition_team<_Iter_diff_t<_FwdIt>> _Team;
_Static_partition_range<_FwdIt> _Basis;
_Pr _Pred;
_Parallel_vector<_Chunk_local_data> _Chunk_locals;
_FwdIt _Results;
_Static_partitioned_remove_if(const size_t _Hw_threads, const _Iter_diff_t<_FwdIt> _Count,
const _FwdIt _First, const _Pr _Pred_)
: _Team{_Count, _Get_chunked_work_chunk_count(_Hw_threads, _Count)},
_Basis{},
_Pred{_Pred_},
_Chunk_locals(_Team._Chunks),
_Results{_First}
{
_Basis._Populate(_Team, _First);
}
_Cancellation_status _Process_chunk()
{
const auto _Key = _Team._Get_next_key();
if (!_Key)
{
return (_Cancellation_status::_Canceled);
}
// remove phase:
auto _Merge_index = _Key._Chunk_number; // merge step will start from this index
{
auto& _This_chunk_data = _Chunk_locals[_Merge_index];
const auto _Range = _Basis._Get_chunk(_Key);
if (_Merge_index == 0 || _Chunk_locals[_Merge_index - 1]._State.load() == _Chunk_state::_Done)
{ // no predecessor, so run serial algorithm directly into results
if (_Merge_index == 0 || _Results == _Range._First)
{
_Results = _STD remove_if(_Range._First, _Range._Last, _Pred);
}
else
{
_Results = _Remove_move_if_unchecked(_Range._First, _Range._Last, _Results, _Pred);
}
_This_chunk_data._State.store(_Chunk_state::_Done);
++_Merge_index; // this chunk is already merged
}
else
{ // predecessor, run serial algorithm in place and attempt to merge later
_This_chunk_data._New_end = _STD remove_if(_Range._First, _Range._Last, _Pred);
_This_chunk_data._State.store(_Chunk_state::_Merging);
if (_Chunk_locals[_Merge_index - 1]._State.load() != _Chunk_state::_Done)
{ // if the predecessor isn't done, whichever thread merges our predecessor will merge us too
return (_Cancellation_status::_Running);
}
}
}
// merge phase: at this point, we have observed that our predecessor chunk has been merged to the output,
// attempt to become the new merging thread if the previous merger gave up
// note: it is an invariant when we get here that _Chunk_locals[_Merge_index - 1]._State == _Chunk_state::_Done
for (; _Merge_index != _Team._Chunks; ++_Merge_index)
{
auto& _Merge_chunk_data = _Chunk_locals[_Merge_index];
auto _Expected = _Chunk_state::_Merging;
if (!_Merge_chunk_data._State.compare_exchange_strong(_Expected, _Chunk_state::_Moving))
{ // either the _Merge_index chunk isn't ready to merge yet, or another thread will do it
return (_Cancellation_status::_Running);
}
const auto _Merge_first = _Basis._Get_first(_Merge_index, _Team._Get_chunk_offset(_Merge_index));
const auto _Merge_new_end = _STD exchange(_Merge_chunk_data._New_end, {});
if (_Results == _Merge_first)
{ // entire range up to now had no removals, don't bother moving
_Results = _Merge_new_end;
}
else
{
_Results = _Move_unchecked(_Merge_first, _Merge_new_end, _Results);
}
_Merge_chunk_data._State.store(_Chunk_state::_Done);
}
return (_Cancellation_status::_Canceled);
}
static void __stdcall _Threadpool_callback(__std_PTP_CALLBACK_INSTANCE, void * const _Context,
__std_PTP_WORK) _NOEXCEPT // enforces termination
{
(void)static_cast<_Static_partitioned_remove_if *>(_Context)->_Process_chunk();
}
};
template<class _FwdIt,
class _Pr> inline
_FwdIt _Remove_if_unchecked(_Sequenced_policy_tag, const _FwdIt _First, const _FwdIt _Last, _Pr _Pred)
{ // remove each satisfying _Pred, serially
return (_STD remove_if(_First, _Last, _Pred));
}
template<class _FwdIt,
class _Pr> inline
_FwdIt _Remove_if_unchecked(_Parallel_policy_tag, const _FwdIt _First, const _FwdIt _Last, _Pr _Pred)
{ // remove each satisfying _Pred, in parallel
const size_t _Hw_threads = __std_parallel_algorithms_hw_threads();
if (_Hw_threads > 1)
{
const auto _Count = _STD distance(_First, _Last);
if (_Count >= 2)
{
_TRY_BEGIN
_Static_partitioned_remove_if<_FwdIt, _Pr> _Operation{_Hw_threads, _Count, _First,_Pred};
_Run_chunked_parallel_work(_Operation);
return (_Operation._Results);
_CATCH(const _Parallelism_resources_exhausted&)
// fall through to serial case below
_CATCH_END
}
}
return (_STD remove_if(_First, _Last, _Pred));
}
template<class _ExPo,
class _FwdIt,
class _Pr,
_Enable_if_execution_policy_t<_ExPo>/* = 0 */>
_NODISCARD inline _FwdIt remove_if(_ExPo&& _Exec, _FwdIt _First, const _FwdIt _Last, _Pr _Pred) _NOEXCEPT
{ // remove each satisfying _Pred
_DEBUG_RANGE(_First, _Last);
return (_Rechecked(_First,
_Remove_if_unchecked(_STD forward<_ExPo>(_Exec), _Unchecked(_First), _Unchecked(_Last), _Pass_fn(_Pred))));
}
template<class _ExPo,
class _FwdIt,
class _Ty,
_Enable_if_execution_policy_t<_ExPo>/* = 0 */>
_NODISCARD inline _FwdIt remove(_ExPo&& _Exec, const _FwdIt _First, const _FwdIt _Last, const _Ty& _Val) _NOEXCEPT
{ // remove each matching _Val
return (_STD remove_if(_STD forward<_ExPo>(_Exec), _First, _Last, [&_Val](auto&& _Lhs) {
return (_STD forward<decltype(_Lhs)>(_Lhs) == _Val);
}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment