Skip to content

Instantly share code, notes, and snippets.

@BillyONeal
Created November 18, 2017 02:46
Show Gist options
  • Save BillyONeal/5a767926c0184cafc6b089f3464fb688 to your computer and use it in GitHub Desktop.
Save BillyONeal/5a767926c0184cafc6b089f3464fb688 to your computer and use it in GitHub Desktop.
Parallel partition O(n)
// PARALLEL FUNCTION TEMPLATE partition
template<class _FwdIt,
class _Pr>
pair<_FwdIt, _Iter_diff_t<_FwdIt>>
_Partition_swap_backward(_FwdIt _First, const _FwdIt _Last, _FwdIt _Beginning_of_falses, _Pr _Pred)
{ // swap elements in [_First, _Last) satisfying _Pred with elements from _Beginning_of_falses
// pre: _Beginning_of_falses < _First
_Iter_diff_t<_FwdIt> _Trues{};
for (; _First != _Last; ++_First)
{
if (_Pred(*_First))
{
_STD iter_swap(_First, _Beginning_of_falses);
++_Trues;
++_Beginning_of_falses;
}
}
return {_Beginning_of_falses, _Trues};
}
template<class _FwdIt,
class _Pr>
struct _Static_partitioned_partition
{
using _Diff = _Iter_diff_t<_FwdIt>;
enum class _Chunk_state : unsigned char
{
_Serial, // while a chunk is in the serial state, it is touched only by an owner thread
_Merging, // while a chunk is in the merging state, threads all try to CAS the chunk _Merging -> _Swapping
// the thread that succeeds takes responsibility of swapping the trues from that chunk to the
// results
_Swapping, // while a chunk is in the swapping state, the trues are being merged with _Results
// only one chunk at a time is ever _Swapping; this also serves to synchronize access to
// _Results and _Results_falses
_Done // when a chunk becomes _Done, it is complete / will never need to touch _Results again
};
#pragma warning(push)
#pragma warning(disable: 4324) // structure was padded due to alignment specifier
struct alignas(hardware_destructive_interference_size) alignas(_FwdIt) _Chunk_local_data
{
atomic<_Chunk_state> _State;
_FwdIt _Beginning_of_falses;
_Diff _Chunk_trues;
};
#pragma warning(pop)
_Static_partition_team<_Diff> _Team;
_Static_partition_range<_FwdIt> _Basis;
_Pr _Pred;
_Parallel_vector<_Chunk_local_data> _Chunk_locals;
_FwdIt _Results;
_Diff _Results_falses;
_Static_partitioned_partition(const size_t _Hw_threads, const _Diff _Count,
const _FwdIt _First, const _Pr _Pred_)
: _Team{_Count, _Get_chunked_work_chunk_count(_Hw_threads, _Count)},
_Basis{},
_Pred{_Pred_},
_Chunk_locals(_Team._Chunks),
_Results{_First},
_Results_falses{}
{
_Basis._Populate(_Team, _First);
}
_Cancellation_status _Process_chunk()
{
const auto _Key = _Team._Get_next_key();
if (!_Key)
{
return (_Cancellation_status::_Canceled);
}
// serial-on-each-chunk phase:
auto _Merge_index = _Key._Chunk_number; // merge step will start from this index
{
auto& _This_chunk_data = _Chunk_locals[_Merge_index];
const auto _Range = _Basis._Get_chunk(_Key);
if (_Merge_index == 0 || _Chunk_locals[_Merge_index - 1]._State.load() == _Chunk_state::_Done)
{ // no predecessor, so run serial algorithm directly into results
if (_Merge_index == 0 || _Results == _Range._First)
{
_Results = _Partition_unchecked(_Range._First, _Range._Last, _Pred, _Iter_cat_t<_FwdIt>());
_This_chunk_data._Chunk_trues = _STD distance(_Range._First, _Results); // FIXME
}
else
{
auto _This_chunk_results = _Partition_swap_backward(_Range._First, _Range._Last, _Results, _Pred);
_Results = _This_chunk_results.first;
_This_chunk_data._Chunk_trues = _This_chunk_results.second;
}
_Results_falses += _Key._Size - _This_chunk_data._Chunk_trues;
_This_chunk_data._State.store(_Chunk_state::_Done);
++_Merge_index; // this chunk is already merged
}
else
{ // predecessor, run serial algorithm in place and attempt to merge later
_This_chunk_data._Beginning_of_falses =
_Partition_unchecked(_Range._First, _Range._Last, _Pred, _Iter_cat_t<_FwdIt>());
_This_chunk_data._Chunk_trues = _STD distance(_Range._First, _This_chunk_data._Beginning_of_falses); // FIXME
_This_chunk_data._State.store(_Chunk_state::_Merging);
if (_Chunk_locals[_Merge_index - 1]._State.load() != _Chunk_state::_Done)
{ // if the predecessor isn't done, whichever thread merges our predecessor will merge us too
return (_Cancellation_status::_Running);
}
}
}
// merge phase: at this point, we have observed that our predecessor chunk has been merged to the output,
// attempt to become the new merging thread if the previous merger gave up
// note: it is an invariant when we get here that _Chunk_locals[_Merge_index - 1]._State == _Chunk_state::_Done
for (; _Merge_index != _Team._Chunks; ++_Merge_index)
{
auto& _Merge_chunk_data = _Chunk_locals[_Merge_index];
auto _Expected = _Chunk_state::_Merging;
if (!_Merge_chunk_data._State.compare_exchange_strong(_Expected, _Chunk_state::_Swapping))
{ // either the _Merge_index chunk isn't ready to merge yet, or another thread will do it
return (_Cancellation_status::_Running);
}
const auto _Merge_key = _Team._Get_chunk_key(_Merge_index);
const auto _Merge_first = _Basis._Get_first(_Merge_index, _Merge_key._Start_at);
const auto _Merge_beginning_of_falses = _STD exchange(_Merge_chunk_data._Beginning_of_falses, {});
// at this point:
// [_Results, _Merge_first) is the region of falses that are already merged
// [_Merge_first, _Merge_beginning_of_falses) is the region of trues in the chunk to merge
// [_Merge_beginning_of_falses, merging chunk's _Last) is the region of falses in the chunk to merge
const auto _Chunk_trues = _Merge_chunk_data._Chunk_trues;
if (_Results_falses < _Chunk_trues)
{ // exchange entire merged-falses range with the end of the chunk's true range
const auto _Merged_falses_first = _Results;
if constexpr (_Is_bidit_v<_FwdIt>)
{
_Results = _STD prev(_Merge_beginning_of_falses, _Results_falses);
}
else
{
_Results = _STD next(_Merge_first, _Chunk_trues - _Results_falses);
}
_Swap_ranges_unchecked(_Merged_falses_first, _Merge_first, _Results);
}
else
{ // exchange entire chunk-trues range with beginning of merged falses range
_Results = _Swap_ranges_unchecked(_Merge_first, _Merge_beginning_of_falses, _Results);
}
_Results_falses += _Merge_key._Size - _Chunk_trues;
_Merge_chunk_data._State.store(_Chunk_state::_Done);
}
return (_Cancellation_status::_Canceled);
}
static void __stdcall _Threadpool_callback(__std_PTP_CALLBACK_INSTANCE, void * const _Context,
__std_PTP_WORK) _NOEXCEPT // enforces termination
{
(void)static_cast<_Static_partitioned_partition *>(_Context)->_Process_chunk();
}
};
template<class _FwdIt,
class _Pr> inline
_FwdIt _Partition_unchecked(_Sequenced_policy_tag, _FwdIt _First, _FwdIt _Last, _Pr _Pred)
{ // move elements satisfying _Pred to beginning of sequence, serially
return (_Partition_unchecked(_First, _Last, _Pred, _Iter_cat_t<_FwdIt>()));
}
template<class _FwdIt,
class _Pr> inline
_FwdIt _Partition_unchecked(_Parallel_policy_tag, _FwdIt _First, _FwdIt _Last, _Pr _Pred)
{ // move elements satisfying _Pred to beginning of sequence, in parallel
const size_t _Hw_threads = __std_parallel_algorithms_hw_threads();
if (_Hw_threads > 1)
{
const auto _Count = _STD distance(_First, _Last);
if (_Count >= 2)
{
_TRY_BEGIN
_Static_partitioned_partition<_FwdIt, _Pr> _Operation{_Hw_threads, _Count, _First,_Pred};
_Run_chunked_parallel_work(_Operation);
return (_Operation._Results);
_CATCH(const _Parallelism_resources_exhausted&)
// fall through to serial case below
_CATCH_END
}
}
return (_Partition_unchecked(_First, _Last, _Pred, _Iter_cat_t<_FwdIt>()));
}
template<class _ExPo,
class _FwdIt,
class _Pr,
_Enable_if_execution_policy_t<_ExPo>/* = 0 */> inline
_FwdIt partition(_ExPo&& _Exec, _FwdIt _First, const _FwdIt _Last, _Pr _Pred) _NOEXCEPT
{ // move elements satisfying _Pred to beginning of sequence
_DEBUG_RANGE(_First, _Last);
return (_Rechecked(_First,
_Partition_unchecked(_STD forward<_ExPo>(_Exec), _Unchecked(_First), _Unchecked(_Last), _Pass_fn(_Pred))));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment