This document describes the eventual intended semantics of await, lays out
some syntactic relief for working with supplies, considers the status of
channels, and proposes the end of the syntax formerly known as earliest,
winner, etc. - which I've never really liked. One further area to be
covered in a similar document to this is cancellation.
Feedback welcome! -- jnthn
The await function is used to efficiently wait for one or more asynchronous
operations to have completed. A common use is with a Promise:
my $p1 = start sub-long-computation();
my $p2 = start another-long-computation();
await $p1, $p2;
say "P1: $p1.result()";
say "P2: $p2.result()";
Since await also unpacks results, you can also write:
my $p1 = start sub-long-computation();
my $p2 = start another-long-computation();
my ($r1, $r2) = await $p1, $p2;
say "P1: $r1";
say "P2: $r2";
If any Promise is broken, then the exception it was broken with is thrown
by await. It's also possible to use await with a Supply. In this csae,
await results in the final value emitted by a Supply, Nil if it was
done without a value, or throws any exception that it quit with. Note
that await will tap the Supply, and so trigger execution if it's an
on-demand supply.
The await function takes one or more objects that do the Awaitable role.
This is done by both Promise and Supply. It then looks up the current
awaiter in dynamic scope, through the $*AWAITER dynamic variable, and
passes the list of Awaitables to its await method. Put another way, the
await function is something like:
sub await(*@awaitables) {
for @awaitables.grep(* !~~ Awaitable) {
die "Cannot await a $_.^name()";
}
$*AWAITER.await(@awaitables);
}
The Awaitable role requires provision of a subscribe-awaiter method,
which takes two closures: one invoked with a result if the awaited operation
is successful, and another invoked with an Exception object if the awaited
operation failed.
The default awaiter, installed in PROCESS, blocks the current thread until
the awaited operation has completed. However, thread pool threads, managed by
ThreadPoolScheduler, have a rather different awaiter. Theirs will take a
continuation rooted at the scheduler's work loop, and pass closures to the
subscribe-awaiter method to schedule the resumption of that closure on the
thread pool when the operation completes. This means that while writing sleep
sort this way will end up with a huge number of kernel threads, and possibly
exhaust the thread pool if you sort enough values:
await do for @values -> $value {
start {
sleep $value;
say $value;
}
}
You can do it potentially with only a single thread pool thread being needed by writing it as:
await do for @values -> $value {
start {
await Promise.in($value);
say $value;
}
}
Note that here the outermost await, presuming it's in the main program body
and so hits the default awaiter, will actually block the main program thread.
But the await inside of the start is being run on a thread pool thread,
and so will quickly return control to the thread pool until the amount of
time has elapsed, and then be resumed.
A serial supply promises you that it will never do concurrent emit, done,
or quit calls. Every supply knows whether it can promise this, and if it
promises it then things downstream can depend on it. That is to say, on a
serial supply, .tap is equivalent to .act. Whether a supply is serial
or not can be introspected with the serial method.
Serial supplies are strongly preferred, since those tapping the supply can
be confident they will not encounter concurrency issues in their projection
functions. All of the built-in Supply factory methods will ensure that any
supplies they produce are serial. This includes supply combinators such as
merge and zip, which tap multiple supplies. The serialize method takes
a non-serial supply and produces a serial one.
XXX method forms of hyper/race TBD (see further down for how sugar looks, though)
A supply block is a convenient way to create an on-demand supply. When the
supply is tapped, the supply block is run. Within it, emit can be used to
emit values to the tapper, and done can be used to convey that there will be
no more values.
my $s = supply {
emit 'a value!';
emit 'another!';
done;
}
The emit and done can be done in nested scopes, and follow the same rules
as gather and take. An unhandled exception thrown inside of a supply block
will be passed onwards using quit.
For consuming supplies, there is an asynchronous looping construct, known as
whenever. A whenever placed lexically inside of a supply (it need not
be directly) will be associated with that Supply. Therefore, a close of the
supply will also close a tap opened by a whenever. As a simple example,
consider writing something to filter negative values out (granted, you could
have done this with Supply.grep):
sub remove-negatives($value-supply) {
return supply {
whenever $value-supply -> $value {
emit $value if $value >= 0;
}
}
}
Since whenever promises you will only ever be inside of its loop body in
one thread at a time, you can also do stateful things:
sub add-sequence-number($supply-in) {
return supply {
whenever $supply-in -> $value {
state $seq-number = 1;
emit [$seq-number++, $value];
}
}
}
Since whenever is asynchronous, and falling out of a supply block does
not close the supply (an explicit done is needed for that), you can set up
many whenevers in a supply block. The "in one loop body at a time" rule
is extended to cover all the whenever loop bodies, so you can safely keep
state visible between them. Thus you can do things like:
sub rate-limit($supply-in, $max-per-second) {
return supply {
my $in-last-second = 0;
# Emit up to the limit values per second.
whenever $supply-in -> $value {
if $in-last-second++ < $max-per-second {
emit $value;
}
}
# Clear the limit once per second.
whenever Supply.interval(1) {
$in-last-second = 0;
}
}
}
Just be sure to keep the state inside of the supply block itself, so you
have it per time the supply is tapped.
Since the whenever does not need to be directly inside of a supply block,
it's also possible to set up combinators that will tap any number of supplies.
sub merge(*@supplies) {
return supply {
for @supplies -> $mergee {
whenever $mergee -> $value {
emit $value;
}
}
}
}
The usual set of control exceptions can be used inside of the construct. That
is redo just re-runs the block with the current value, next skips the rest
of the code in the loop, and last un-taps the supply we're reacting to (this
is the in-loop way to close the tap). So we can write something that starts to
emit values when one matching a certain condition comes its way, and then shut
down the supply when a value matching the off condition comes its way:
sub on-off($supply-in, &on, &off) {
return supply {
whenever $supply-in -> $value {
state $on = False;
unless $on {
next unless on($value);
$on = True;
}
last if off($value);
emit $value;
}
}
}
If all whenever blocks in a supply are done, then the supply itself
will also signal done. Therefore, our merge routine above handles finite
supplies correctly "automatically". By contrast, our rate limiting example is
problematic as the per-second interval we're using is eternal. Thankfully, we
can use loop phasers to spot the last event in the supply we're filtering, and
mark the surrounding supply as done:
sub rate-limit($supply-in, $max-per-second) {
return supply {
my $in-last-second = 0;
# Emit up to the limit values per second.
whenever $supply-in -> $value {
if $in-last-second++ < $max-per-second {
emit $value;
LAST done;
}
}
# Clear the limit once per second.
whenever Supply.interal(1) {
$in-last-second = 0;
}
}
}
If any of the whenevers in a supply block should quit, or if the code in
a whenever block throws an exception, then this will be sent onwards and all
taps on other whenever blocks will be closed. More control over errors can
be obtained using the QUIT phaser. A QUIT phaser applies to all whenever
blocks in the exact scope that it exists in. Like CATCH, it expects you to
smartmatch on the exception, and failure to match it will cause the quit to
carry onwards.
We could write an infinite retry as:
sub retry-forever($supply-in) {
return supply {
sub try-it() {
whenever $supply-in -> $value {
emit $value;
}
QUIT {
default {
try-it();
}
}
}
try-it();
}
}
Though that particular example should already be available as a .retry(*)
method on any Supply. Also note that, due to the asynchronous nature of
whenever and QUIT, that is not actually an infinite recursion!
A slightly more intresting example would be implementing a fallback:
sub fallback($supply-a, $supply-b) {
return supply {
whenever $supply-a -> $value {
emit $value;
}
QUIT {
default {
whenever $supply-b -> $value {
emit $value;
}
}
}
}
}
Here, the QUIT only applies to the first whenever, and the second is not
protected. This one should also be available simply as a .catch(...) method
on a Supply, though. You'll most likely use QUIT in other more complex
supplies.
For a supply block with many whenevers, one QUIT can be used to deal
with any of them quitting. To get finer control (if you wanted to do different
things for different whenevers), putting one or more whenevers in a bare
block along with a QUIT will suffice.
If you nest one whenever inside of another, then it will be associated with
any enclosing supply. This can be useful for writing things over supplies of
supplies. For example, we may have a supply of news topics. Whenever we start
to be interested in a new topic, it's emitted, and the object it sends has a
supply of posts. We want to then also tap those posts, and emit strings that
have the topic name along with the post title. We can write this as:
class Topic {
has Str $.name;
has Supply $.posts;
...
}
class Post {
has Str $.title;
...
}
sub active-topic-titles($topic-supply) {
return supply {
whenever $topic-supply -> $topic {
whenever $topic.posts -> $post {
emit "[$topic.name()]: $post.title()";
}
}
}
}
Alternatively, we might only we interested in one topic at a time, and want to
close the tap on the last topic. Since a whenever evaluates to a Tap, we
can just write:
sub latest-topic-titles($topic-supply) {
return supply {
my $cur-posts-tap;
whenever $topic-supply -> $topic {
$cur-posts-tap.?close();
$cur-posts-tap = whenever $topic.posts -> $post {
emit "[$topic.name()]: $post.title()";
}
}
}
}
XXX further semantics of nesting?
It's also possible to use whenever on a Promise. This will actually coerce
it into a Supply first.
sub timeout-until-first-value($supply-in, $timeout) {
return supply {
my $saw-value = False;
whenever $supply-in -> $value {
emit $value;
$saw-value = True;
}
whenever Promise.in($timeout) {
die "Timed out" unless $saw-value;
}
}
}
Note that the die here causes a quit on the supply overall, and that in
turn closes the tap of $supply-in, which may invoke some cancellation
logic (for example, cancelling an in-flight web request).
One may wonder what to do at the top level of an application, where it's
mostly just interesting to "sink" other supplies and do side-effects rather
than emit values. Even if values won't be emitted, it still makes sense to
nest this in a supply block for the sake of error handling. Here's a
simple echo server, to demonstrate:
await supply {
whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
say "Incoming connection";
whenever $conn.bytes_supply -> $buf {
try say $buf.decode;
await $conn.write($buf);
LAST $conn.close;
}
}
}
The use of await inside a whenever, as in this example, will prevent that
whenever loop from seeing any more values until after the await has been
completed. It has no effect on any outer whenever blocks, however. This
means we will not echo output back out-of-order, but equally we will not
block processing of other incoming connections.
The "one thread at a time" semantics are good for safety, but sometimes you
are dealing with a supply where you'd rather process the incoming events over
a number of threads. For example, there's no reason we can't process incoming
requests to our echo server over multiple threads. We do this using race:
await supply {
race whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
say "Incoming connection";
whenever $conn.bytes_supply -> $buf {
try say $buf.decode;
await $conn.write($buf);
LAST $conn.close;
}
}
}
Note that we do not apply race to the inner whenever, since we do care
about ordering there. The race prefix will cause the whenever loop body
to be scheduled on the thread pool, meaning that even if the supply we are
tapping is serial, we will behave as if it were not.
All supplies produced by supply are serial, even in the presence of race.
That is to say:
my $results = supply {
race whenever $things-to-compute -> $value {
emit do-long-computation($value);
}
}
my $total = 0;
$results.tap: {
$total += $_;
say "$_ ($total)";
}
Will never be a data race on $total. However, with race the order of the
emitted results may not match the input order. To get that, use hyper:
my $results = supply {
hyper whenever $things-to-compute -> $value {
emit do-long-computation($value);
}
}
Our current channels are really just a blocking concurrent queue. This can be
useful, but since it implies blocked threads waiting for values then it's not
the best of solutions for most user-space problems. Better is to use await
with a Promise, or whenever over a Supply. Also, the name channel is
evocative of the concept in Go, but our channels aren't like that. They don't
block senders until the receiver is ready, for example. While a blocking queue
is a very useful thing to have in a language, and we should keep it, we may
want to stop calling it Channel, to avoid confusion (and keep the name free
for something that has the Go-ish semantics). Finally, the error propagation
could now go away (completion propagation, on the other hand, can still be
very useful). In general, this type becomes a little more "low level", since
the good uses of it are usually in implementing things like thread pools -
that is, infrastructure.
One of the reasons we've had trouble with this construct is that it tried to
support both promises and channels, but the former are non-blocking and the
latter are blocking. Its origins date back to before supplies. Now we have
supplies, and with a proposal for some nice syntactic relief for them, which
also handles promises nicely, it's probably time to say goodbye to earliest
and friends.
And what of consuming a blocking queue (currently Channel)? There's still
the .list coercer that produces a lazy list of things coming out of it, and
we can (keep/add) a coercer to Supply. In fact, that would even mean that we
get whenever support for channels "for free" and people can simply move to
the new syntax.
It would also seem reasonable for 'await' to return any thrown-then-resumed Exceptions in the corresponding slot in the unpacked array.
The QUIT example claiming it is "not an infinite recursion" strikes me as confusing. The QUIT block appears to be inside the dynamic scope of the sub. It does not matter that the sub returns, immediately it is still in that scope (persisted as a continuation). Would sub a { my $_a = 1; whenever ...; QUIT {$_a.say }} find the $ *a?
At first it looks like the parent and child nested whenevers still are in the same "only in one loop body at a time rule" and that is why the one-topic-at-a-time example doesn't just prematurely close topics before viewing all the posts in the topic. Then the echo server example states that awaiting inside the child whenever does not block processing new connections. This is causing me some cognitive dissonance.