I can parallelize execution using Hyper and it works:
$ raku -e 'race for (^8).race(batch => 1, degree => 4) {sleep rand; .say}'
0
3
5
2
1
7
4
6
But how can I add gather/take behavior to such loop?
In this approach take won't detect that it is wrapped in gather context:
$ raku -e '
my @x = gather race for (^8).race(batch => 1, degree => 4) {
sleep rand;
take $_;
};
@x.say
'
Died at:
take without gather
In this approach gather will consume HyperSeq without actually "hypering" over it:
$ raku -e '
my @x = race gather for (^8).race(batch => 1, degree => 4) {
sleep rand;
take $_;
};
@x.say
'
[0 1 2 3 4 5 6 7]
Depending on the exact behaviour you want, you may already be very close to what you actually want. But first, let's see what that actually is. A little summary of eagerness/lazyness levels up front should be helpful:
Hyper and Race basically live on the same spectrum as Eager and Lazy.
What gather
/take
allows you to create easily is a Lazy sequence. It will not generate values - and ideally not do any computation - until some consumer asks for more values. At the position of a take
, execution of your code can be paused. This is what "coroutines", or "cooperative parallelism" are about (this is a bit of a simplification, but correct enough).
Eager is one step away from Lazy. Evaluating a lazy sequence eagerly will ask for all values right away, until the sequence is exhausted.
Even further away is Hyper. Hypering a part of (or the entirety of) your pipeline distributes work across multiple threads. Values are calculated "as fast as possible". You do not need to ask for more values in order for more computation to happen.
You are already using the degree
and batch
parameters to ensure that your very small input sequence of just 8 values (^8
) even launches parallel workers in the first place.
Race is one more, albeit little, step further away from lazy. You want your values to not just be calculated fast, you specify that you wouldn't even mind if the results come out in a different order, as long as a value that was calculated is available quickly. Hyper on the other hand gives everything coming into the pipeline a number, and corresponding results coming out are ordered by this number.
hyper for
and race for
With these versions of the for
loop syntax, you "opt in" to having the loop body run - potentially - on multiple threads in parallel.
With just for
the assumption is that you don't want that to happen. Especially in cases where your for
loop sits innocently inside a sub or method, and someone passes in a HyperSeq
to be iterated over, the code inside the for loop is very likely not written to make sure it's thread-safe.
Just hyper for
or race for
do not guarantee that something is hypered, but a .hyper
or .race
call on something will.
for
loopYou can turn a for
loop (or a loop
loop, or a while
loop, or things that aren't loops like an if
or given
statement) from a statement to an expression, syntactically speaking, by putting a do
in front or by enclosing it in parenthesis.
Another thing that is an expression rather than a statement is gather
, which might be the original reason why you were asking for gather
/take
in the first place.
Here's an example of storing the result of your first code example in a variable and storing it. I put a "before" and "after" output in the loop body, so that the parallelization can be seen:
my @result = do race for (^8).race(batch => 1, degree => 4) {
"$_ ...".say;
sleep rand;
"... $_".say;
$_
}
say @result.raku;
=output
0 ...
1 ...
... 1
2 ...
3 ...
4 ...
... 3
5 ...
... 5
6 ...
... 0
7 ...
... 4
... 2
... 6
... 7
[1, 3, 5, 0, 2, 4, 6, 7]
It's probably a little bit simpler to do it without a for
loop by using .map
:
my @result = (^8).race(batch => 1, degree => 4)
.map({
"$_ ...".say;
sleep rand;
"... $_".say;
$_
});
say @result.raku;
But that probably comes down to taste, and how the rest of the code looks.
One thing that gather
/take
is very convenient for is when the code you run for each piece of input can give different numbers of results, and you would like the results to go into the resulting list as one sequence of values, rather than a list of lists.
If it's fine for individual result values from each "work batch" to "stick together" even though you're asking for Race semantics, then returning a Slip
from the .map
block, or the for
loop body gets you that:
my @result = (^8).race(batch => 1, degree => 4)
.map({ sleep rand; |("$_ ...", "... $_") });
.raku.say for @result
=output
"2 ..."
"... 2"
"3 ..."
"... 3"
"1 ..."
"... 1"
"0 ..."
"... 0"
"5 ..."
"... 5"
"7 ..."
"... 7"
"4 ..."
"... 4"
"6 ..."
"... 6"
Here you can see what I mean by "stick together": The "before" and "after" entries for each number are never separated from one another.
One of the drawbacks is that you either have to put all your results at the end, or "collect them" in an Array
and then .Slip
them at the end to return it.
If you prefer to use gather
/take
syntax to collect the values inside the map
or for
code block, you can actually do this:
.map({ slip do gather { take "$_ ..."; sleep rand; take "... $_" } })
In order to get at the individual results from inside a map block immediately after they are ready, which is something that gather
/take
also does with ease, you can send your individual values through a Channel
like this:
my Channel $results .= new;
start react whenever $results { .raku.say };
race for (^8).race(batch => 1, degree => 4) {
$results.send("$_ ...");
sleep rand;
$results.send("... $_");
}
$results.close;
=output
"0 ..."
"1 ..."
"2 ..."
"3 ..."
"... 3"
"4 ..."
"... 0"
"5 ..."
"... 2"
"6 ..."
"... 1"
"7 ..."
"... 6"
"... 5"
"... 4"
"... 7"
A few important things to point out here:
start
a "worker" process if we want to actually immediately see values as they arrive.race for
again here, as I want the code after my race for
to happen after the whole race is over.start react whenever
because i like react
/whenever
, but there's other spellings for this that are also fine:
$results.Supply.tap({ .raku.say });
where the Supply
coercer of the Channel
lets us tap
the messages on the Channel
. This work is scheduled on our $*SCHEDULER
.start .say for $results.list;
where the list
contextualizer of Channel
lets us iterate over the messages that come through the Channel
as if it were a lazy list..say for $results.list;
after the $results.close;
will show all results at once when everything is done, instead of doing it as they come. This does not change how the values are generated, however..close
the Channel
after the race is done (and before .say for $results.list;
if you prefer that) so that anything listening to the Channel
gets the message and stops blocking.gather
outside a race
and then take
inside it?The thing about how gather
/take
works is very similar to how exceptions work. The stack of callers is walked to find a frame that has a handler for the kind of exception that has been thrown.
In the case of X::Argh.new(message => "oh no")
that is a regular exception, but there's also "control exceptions". You can ask your raku what's available:
$ raku -e 'for CX::.values {
given .^name {
say ($_ eq "CX::Take" ?? "My Favourite! " !! "")
~ $_
}
}'
CX::Next
CX::Proceed
CX::Last
My Favourite! CX::Take
CX::Done
CX::Return
CX::Redo
CX::Emit
CX::Warn
CX::Succeed
Here you'll see:
CX::Warn
, which has a default handler that simply outputs to standard error and resumes.next
, last
, succeed
and proceed
(commonly implicitly through when
and default
blocks) and so on.CX::Return
which is for the return
statement and also its method form.CX::Emit
which comes from react
and supply
.CX::Take
, which is what take
uses.A gather
is - at its most basic - an exception handler that receives the value so that it can be given as the iteration result and then "freezes execution" at that spot (it "takes a continuation") so that it can properly be lazy and do nothing until another value is requested.
But when multiple threads are involved, we don't really have a single stack of execution where going through each frame's caller starting at the take
will lead us to the gather
.
Instead, we reach whatever has called our code, which in most cases is probably the work loop of a Scheduler
's worker thread.
In the most common case, a regular exception reaching this point will be recorded and used to break the Vow
of a Promise
. In particular, this is what you get when you use the start
syntax.
For the case of take
, there is no default handler. The take
control exception can't find a corresponding gather
and bails out.
There is currently no design or specification for how a CX::Take
could be made to cross that gap. I could think of a few special cases where it could be made to work with something like a gather race for
or so, but nothing general comes to mind just yet.