Last active
March 22, 2017 23:16
-
-
Save lizmat/4369e28d6c8a23d54e944881c4df07af to your computer and use it in GitHub Desktop.
HLL and documented prototype of .hyper/.race support
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use MONKEY; | |
class ConcurrentChain does Iterable { # perhaps this should be HyperSeq ? | |
has $!source; # the source iterator | |
has int $!batch; # batch size | |
has int $!degree; # degree size (number of workers) | |
has str $!method; # name of method: "hyper" or "race" | |
has @!actions; # Pairs of actions: Method => Capture | |
# Need to special case "map", to handle "for {}", which is code-genned | |
# as a .map, but with a :item named parameter specified | |
my $map-method := List.^find_method("map"); | |
method map(ConcurrentChain:D: :$item, |c) { | |
if defined($item) { | |
Seq.new(self.iterator).map(:$item, |c) | |
} | |
elsif c.AT-POS(0).has-phaser("LAST") { | |
Seq.new(self.iterator).map(|c) # must use serial version | |
} | |
else { # concurrent, save as action to do | |
@!actions.push(Pair.new($map-method,c)); | |
self | |
} | |
} | |
# Need to special case "grep" in case the block has a LAST phaser. | |
my $grep-method := List.^find_method("grep"); | |
method grep(ConcurrentChain:D: |c) { | |
if c.AT-POS(0).has-phaser("LAST") { | |
Seq.new(self.iterator).grep(|c) # must use serial version | |
} | |
else { # concurrent, save as action to do | |
@!actions.push(Pair.new($grep-method,c)); | |
self | |
} | |
} | |
# concurrent methods that should just save action for later | |
# BEGIN for <grep> { | |
# my $call-method := List.^find_method($_); | |
# my $chain-method := method (ConcurrentChain:D: |c) { | |
# @!actions.push(Pair.new($call-method,c)); | |
# self | |
# } | |
# $chain-method.set_name($_); | |
# ::?CLASS.^add_method($_,$chain-method) | |
# } | |
# serial methods | |
BEGIN for <gist keys kv perl repeated squish Str unique values> { | |
my $call-method := List.^find_method($_); | |
my $chain-method := method (ConcurrentChain:D: |c) { | |
self.iterator.push-all(my $buffer := IterationBuffer.CREATE); | |
$call-method( | |
nqp::p6bindattrinvres(nqp::create(List),List,'$!reified',$buffer) | |
) | |
} | |
$chain-method.set_name($_); | |
::?CLASS.^add_method($_,$chain-method) | |
} | |
method !SET-SELF(\source, \batch, \degree, $method) { | |
$!source := source; | |
$!batch = batch; | |
$!degree = degree; | |
$!method = $method; | |
self | |
} | |
method new(\source, \batch, \degree, $method) { | |
batch <= 0 | |
?? X::Invalid::Value.new( | |
:$method, :name<batch>, :value(batch)).throw | |
!! degree <= 0 | |
?? X::Invalid::Value.new( | |
:$method,:name<degree>,:value(degree)).throw | |
!! self.CREATE!SET-SELF(source, batch, degree, $method) | |
} | |
# The serial iterator producing the results | |
method iterator() { | |
@!actions # we can run actions | |
?? $!method eq 'hyper' | |
?? Rakudo::Iterator.HyperActions( # we want "hyper" | |
$!source,@!actions,$!batch,$!degree) | |
!! Rakudo::Iterator.RaceActions( # we want "race" | |
$!source,@!actions,$!batch,$!degree) | |
!! $!source # sadly, nothing to do | |
} | |
# Make sure we will always run the iterator | |
method sink() { self.iterator.sink-all } | |
} | |
# Since we cannot augment a role, specifically the Iterable role, we do | |
# an augment on Seq. For now, we must therefore do a .Seq on any iterable | |
# before we can hyper / race it. | |
augment class Seq { | |
# Cannot call it hyper | |
method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) { | |
ConcurrentChain.new(self.iterator,$batch,$degree,"hyper") | |
} | |
# Cannot call it race | |
method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) { | |
ConcurrentChain.new(self.iterator,$batch,$degree,"race") | |
} | |
} | |
augment class Rakudo::Iterator { | |
my $empty := IterationBuffer.CREATE; | |
# Returns a sort of hyper-iterator that can be called from different | |
# threads at the same, and which produces Pairs with an ordinal number | |
# and an IterationBuffer filled with as many elements that could be | |
# fetched from the given source iterator and the given batch size. | |
# If the source iterator is exhausted, it will keep returning | |
# IterationEnd no matter how many times .pull-buffer is called. | |
method ConcurrentBatcher(\iterator, Int:D $batch) { | |
class { | |
has $!iterator; # the source iterator | |
has $!lock; # making sure one worker runs this code | |
has int $!ordinal; # the key of the Pair we produce | |
has int $!batch; # 0 indicates we're exhausted | |
method !SET-SELF(\iterator, \batch) { | |
$!iterator := iterator; | |
$!lock := Lock.new; | |
$!ordinal = -1; | |
$!batch = batch; | |
self | |
} | |
method new(\it,\ba) { self.CREATE!SET-SELF(it,ba) } | |
method pull-buffer() { | |
nqp::if( | |
$!batch, | |
nqp::stmts( # we're still in business | |
(my $buffer := IterationBuffer.CREATE), | |
(my $iterator := $!iterator), | |
(my int $batch = $!batch), | |
(my int $found = -1), | |
$!lock.protect({ | |
nqp::stmts( # we haz the source iterator | |
nqp::until( | |
nqp::iseq_i( | |
($found = nqp::add_i($found,1)), | |
$batch | |
) || nqp::eqaddr( | |
(my $pulled := $iterator.pull-one), | |
IterationEnd | |
), | |
nqp::bindpos($buffer,$found,$pulled) | |
), | |
nqp::if( # source iterator exhausted | |
nqp::islt_i($found,$batch), | |
nqp::stmts( # only produce what we found | |
nqp::splice( | |
$buffer, | |
$empty, | |
$found, | |
nqp::sub_i($batch,$found) | |
), | |
($!batch = 0) # tell the world we're done | |
) | |
), | |
nqp::if( | |
$found, | |
Pair.new( # produce a batch | |
($!ordinal = nqp::add_i($!ordinal,1)), | |
$buffer | |
), | |
IterationEnd # we're done here | |
) | |
) | |
}) | |
), | |
IterationEnd # we were already done | |
) | |
} | |
}.new(iterator,$batch) | |
} | |
# This role provides the basis for concurrent processing of an | |
# iterator. Its .new method expects a source iterator, a list of | |
# actions to be performed (consisting of method => capture pairs), | |
# on each value obtained from the source iterator, the maximum size | |
# of the buffer to be processed inside a single worker, and the | |
# degree (aka number of workers). Classes must provide at least a | |
# pull-one and a !TWEAK method (which is expected to return self). | |
# | |
# Please note that although all sorts of parallel processing happens | |
# inside the classes that do this role, the classes are expose | |
# themselves as ordinary Iterator classes to the world. | |
role ConcurrentActionator does Iterator { | |
has $!queue; | |
has @!promises; | |
has Int $!alive; # alas, cannot be a native int :-( | |
# We use a concurrent blocking queue (the work horse of a | |
# Channel, but without any frills here). It is save to | |
# nqp::push() on it from any thread, and nqp::shift() will | |
# block until a value becomes available. Since we know the | |
# number of workers, and each worker will send an IterationEnd | |
# at the end, by counting the number of IterationEnd's seen, | |
# we know when we should stop nqp::shift()ing from the queue. | |
my class Queue is repr('ConcBlockingQueue') { } | |
method !TWEAK { ... } | |
method !SET-SELF(\source, \actions, \batch, \degree) { | |
# Create two lists: one with methods to call, and one | |
# with captures to apply, so we can easily index into | |
# what we need to do. | |
my int $todo = actions.elems; | |
my $methods := nqp::setelems(nqp::list,$todo); | |
my $captures := nqp::setelems(nqp::list,$todo); | |
for actions.kv -> $w, $action { | |
my $m := $action.key; | |
nqp::istype($m,Method) | |
?? nqp::bindpos($methods,$w,$m) | |
!! nqp::bindpos($methods,$w,List.^find_method($m.Str)); | |
nqp::bindpos($captures,$w,$action.value); | |
} | |
# Set up the batcher and the queue. | |
$!alive = degree; | |
$!queue := Queue.CREATE; | |
my $batcher := Rakudo::Iterator.ConcurrentBatcher(source,batch); | |
# Set up the promises of the workers. Not sure we would | |
# actually ever need that, but maybe we can do something | |
# with reducing operations in the future, where we would | |
# need the result. Or we can have the worker just return | |
# the number of elements processed, or other statistics. | |
for ^degree -> $w { | |
@!promises.BIND-POS($w, | |
start { | |
CATCH { .say } # not sure what to do about exceptions | |
# Ensure we *always* queue an IterationEnd for this | |
# worker when we're done, normally or exceptionally. | |
LEAVE nqp::push($!queue,IterationEnd); | |
# A dummy HLL list to be passed as a "self", with | |
# its $!reified changed for each call. | |
my $List := List.CREATE; | |
# While we have batches | |
nqp::until( | |
nqp::eqaddr( | |
(my $enroute := $batcher.pull-buffer), | |
IterationEnd | |
), | |
# For all actions to perform | |
nqp::stmts( | |
(my int $i = -1), | |
nqp::while( | |
nqp::islt_i(($i = nqp::add_i($i,1)),$todo), | |
nqp::stmts( | |
# Transplant batch into List | |
nqp::bindattr($List,List,'$!reified', | |
nqp::getattr($enroute,Pair,'$!value') | |
), | |
# Run the iterator on the List for this action | |
nqp::atpos($methods,$i)( | |
$List,|nqp::atpos($captures,$i) | |
).iterator.push-all( | |
nqp::bindattr($enroute,Pair,'$!value', | |
IterationBuffer.CREATE | |
) | |
) | |
) | |
) | |
), | |
# Queue the result of all actions | |
nqp::push($!queue,$enroute) | |
) | |
} | |
) | |
} | |
# The instantiated object | |
self | |
} | |
method new(\source, \actions, \batch, \degree) { | |
self.CREATE!SET-SELF(source,actions,batch,degree)!TWEAK | |
} | |
method sink-all(--> IterationEnd) is raw { | |
# Just eat the queue, we don't care about order in any way | |
nqp::while( | |
$!alive, | |
nqp::stmts( | |
nqp::until( | |
nqp::eqaddr(nqp::shift($!queue),IterationEnd), | |
nqp::null | |
), | |
nqp::unless( | |
--$!alive, | |
self!cleanup, | |
) | |
) | |
) | |
} | |
# Handle the promises, indicate we're done | |
method !cleanup(--> IterationEnd) { | |
.result for @!promises | |
} | |
} | |
# The "hyper" case of the ConcurrentActionator role. | |
method HyperActions(\source,\actions,\batch,\degree) { | |
class :: does ConcurrentActionator { | |
has $!slipped; # current list of values to produce | |
has $!processed; # list of processed chunks | |
has int $!offset; # ordinal number of chunk at index 0 | |
method !TWEAK() { | |
$!slipped := $empty; | |
$!processed := nqp::list; | |
self | |
} | |
method pull-one() is raw { | |
nqp::if( | |
nqp::elems($!slipped), | |
nqp::shift($!slipped), # produce from the chunk | |
nqp::if( | |
$!alive, | |
nqp::stmts( | |
nqp::if( # no chunk to produce from | |
nqp::existspos($!processed,0), | |
nqp::stmts( # next chunk is available | |
($!offset = nqp::add_i($!offset,1)), | |
($!slipped := nqp::shift($!processed)) | |
), | |
nqp::if( # next chunk not there | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::if( # a worker has expired | |
--$!alive, | |
self.pull-one, # others not, try again | |
self!cleanup, # mohican time, bye bye | |
), | |
nqp::if( # a fresh chunk | |
nqp::iseq_i( | |
$!offset, | |
(my int $ordinal = $chunk.key) | |
), | |
nqp::stmts( # in sequence chunk | |
($!offset = nqp::add_i($!offset,1)), | |
nqp::if( # lose placeholder if any | |
nqp::elems($!processed), | |
nqp::shift($!processed) | |
), | |
($!slipped := $chunk.value) | |
), | |
nqp::stmts( # out of sequence | |
nqp::bindpos( # store for later usage | |
$!processed, | |
nqp::sub_i($ordinal,$!offset), | |
$chunk.value | |
), | |
) | |
) | |
) | |
), | |
self.pull-one # rinse and repeat | |
), | |
IterationEnd | |
) | |
) | |
} | |
method push-all($target --> IterationEnd) { | |
nqp::stmts( | |
nqp::while( # produce from available | |
nqp::elems($!slipped), | |
$target.push(nqp::shift($!slipped)) | |
), | |
nqp::while( # do the other chunks | |
$!alive, | |
nqp::if( | |
nqp::existspos($!processed,0), | |
nqp::stmts( # next chunk is available | |
($!offset = nqp::add_i($!offset,1)), | |
(my $slipped := nqp::shift($!processed)), | |
nqp::while( | |
nqp::elems($slipped), | |
$target.push(nqp::shift($slipped)) | |
) | |
), | |
nqp::if( # next chunk not there | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::unless( # worker expired | |
--$!alive, | |
self!cleanup # mohican time, bye bye | |
), | |
nqp::bindpos( # out of sequence | |
$!processed, # store for later usage | |
nqp::sub_i($chunk.key,$!offset), | |
$chunk.value | |
) | |
) | |
) | |
) | |
) | |
} | |
}.new(source, actions, batch, degree) | |
} | |
# The "race" case of the ConcurrentActionator role. | |
method RaceActions(\source,\actions,\batch,\degree) { | |
class :: does ConcurrentActionator { | |
has $!slipped; | |
method !TWEAK() { | |
$!slipped := $empty; | |
self | |
} | |
method pull-one() is raw { | |
nqp::if( | |
nqp::elems($!slipped), | |
nqp::shift($!slipped), # produce from available | |
nqp::if( # no values available | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::if( # worker exhausted | |
--$!alive, | |
self.pull-one, # but still others | |
self!cleanup, # mohican time, bye bye | |
), | |
nqp::stmts( | |
($!slipped := $chunk.value), # could be empty | |
self.pull-one # so try again | |
) | |
) | |
) | |
} | |
method push-all($target --> IterationEnd) { | |
nqp::stmts( | |
nqp::while( # produce from available | |
nqp::elems($!slipped), | |
$target.push(nqp::shift($!slipped)) | |
), | |
nqp::while( # do the other chunks | |
$!alive, | |
nqp::until( # still in business | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::stmts( # we have a chunk | |
(my $slipped := $chunk.value), | |
nqp::while( # push the chunk | |
nqp::elems($slipped), | |
$target.push(nqp::shift($slipped)) | |
) | |
) | |
), | |
nqp::unless( | |
--$!alive, | |
self!cleanup, # mohican time, bye bye | |
) | |
) | |
) | |
} | |
}.new(source, actions, batch, degree) | |
} | |
} | |
my @a = ^106; | |
my $now = now; | |
dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 }); | |
say "parallel processed in {now - $now}"; | |
$now = now; | |
dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 }; | |
say "serial processed in {now - $now}"; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment