raku

Use gather/take with race


I can parallelize execution using Hyper and it works:

$ raku -e 'race for (^8).race(batch => 1, degree => 4) {sleep rand; .say}'
0
3
5
2
1
7
4
6

But how can I add gather/take behavior to such loop?

In this approach take won't detect that it is wrapped in gather context:

$ raku -e '
    my @x = gather race for (^8).race(batch => 1, degree => 4) {
        sleep rand;
        take $_;
    };
    @x.say
'

Died at:
    take without gather

In this approach gather will consume HyperSeq without actually "hypering" over it:

$ raku -e '
    my @x = race gather for (^8).race(batch => 1, degree => 4) {
        sleep rand;
        take $_;
    };
    @x.say
'

[0 1 2 3 4 5 6 7]

Solution

  • Depending on the exact behaviour you want, you may already be very close to what you actually want. But first, let's see what that actually is. A little summary of eagerness/lazyness levels up front should be helpful:

    Eagerness, Lazyness, and Hyper / Race

    Hyper and Race basically live on the same spectrum as Eager and Lazy.

    What gather/take allows you to create easily is a Lazy sequence. It will not generate values - and ideally not do any computation - until some consumer asks for more values. At the position of a take, execution of your code can be paused. This is what "coroutines", or "cooperative parallelism" are about (this is a bit of a simplification, but correct enough).

    Eager is one step away from Lazy. Evaluating a lazy sequence eagerly will ask for all values right away, until the sequence is exhausted.

    Even further away is Hyper. Hypering a part of (or the entirety of) your pipeline distributes work across multiple threads. Values are calculated "as fast as possible". You do not need to ask for more values in order for more computation to happen.

    You are already using the degree and batch parameters to ensure that your very small input sequence of just 8 values (^8) even launches parallel workers in the first place.

    Race is one more, albeit little, step further away from lazy. You want your values to not just be calculated fast, you specify that you wouldn't even mind if the results come out in a different order, as long as a value that was calculated is available quickly. Hyper on the other hand gives everything coming into the pipeline a number, and corresponding results coming out are ordered by this number.

    hyper for and race for

    With these versions of the for loop syntax, you "opt in" to having the loop body run - potentially - on multiple threads in parallel.

    With just for the assumption is that you don't want that to happen. Especially in cases where your for loop sits innocently inside a sub or method, and someone passes in a HyperSeq to be iterated over, the code inside the for loop is very likely not written to make sure it's thread-safe.

    Just hyper for or race for do not guarantee that something is hypered, but a .hyper or .race call on something will.

    Storing the result of a for loop

    You can turn a for loop (or a loop loop, or a while loop, or things that aren't loops like an if or given statement) from a statement to an expression, syntactically speaking, by putting a do in front or by enclosing it in parenthesis.

    Another thing that is an expression rather than a statement is gather, which might be the original reason why you were asking for gather/take in the first place.

    Here's an example of storing the result of your first code example in a variable and storing it. I put a "before" and "after" output in the loop body, so that the parallelization can be seen:

    my @result = do race for (^8).race(batch => 1, degree => 4) {
        "$_ ...".say;
        sleep rand;
        "... $_".say;
        $_
    }
    say @result.raku;
    
    =output
    0 ...
    1 ...
    ... 1
    2 ...
    3 ...
    4 ...
    ... 3
    5 ...
    ... 5
    6 ...
    ... 0
    7 ...
    ... 4
    ... 2
    ... 6
    ... 7
    [1, 3, 5, 0, 2, 4, 6, 7]
    

    It's probably a little bit simpler to do it without a for loop by using .map:

    my @result = (^8).race(batch => 1, degree => 4)
        .map({
            "$_ ...".say;
            sleep rand;
            "... $_".say;
            $_
        });
    say @result.raku;
    

    But that probably comes down to taste, and how the rest of the code looks.

    Returning a variable number of results for each input

    One thing that gather/take is very convenient for is when the code you run for each piece of input can give different numbers of results, and you would like the results to go into the resulting list as one sequence of values, rather than a list of lists.

    If it's fine for individual result values from each "work batch" to "stick together" even though you're asking for Race semantics, then returning a Slip from the .map block, or the for loop body gets you that:

    my @result = (^8).race(batch => 1, degree => 4)
        .map({ sleep rand; |("$_ ...", "... $_") });
    .raku.say for @result
    
    =output
    "2 ..."
    "... 2"
    "3 ..."
    "... 3"
    "1 ..."
    "... 1"
    "0 ..."
    "... 0"
    "5 ..."
    "... 5"
    "7 ..."
    "... 7"
    "4 ..."
    "... 4"
    "6 ..."
    "... 6"
    

    Here you can see what I mean by "stick together": The "before" and "after" entries for each number are never separated from one another.

    One of the drawbacks is that you either have to put all your results at the end, or "collect them" in an Array and then .Slip them at the end to return it.

    If you prefer to use gather/take syntax to collect the values inside the map or for code block, you can actually do this:

    .map({ slip do gather { take "$_ ..."; sleep rand; take "... $_" } })

    Returning values even sooner after they are calculated

    In order to get at the individual results from inside a map block immediately after they are ready, which is something that gather/take also does with ease, you can send your individual values through a Channel like this:

    my Channel $results .= new;
    start react whenever $results { .raku.say };
    race for (^8).race(batch => 1, degree => 4) {
        $results.send("$_ ...");
        sleep rand;
        $results.send("... $_");
    }
    $results.close;
    
    =output
    "0 ..."
    "1 ..."
    "2 ..."
    "3 ..."
    "... 3"
    "4 ..."
    "... 0"
    "5 ..."
    "... 2"
    "6 ..."
    "... 1"
    "7 ..."
    "... 6"
    "... 5"
    "... 4"
    "... 7"
    

    A few important things to point out here:

    Why can't I gather outside a race and then take inside it?

    The thing about how gather/take works is very similar to how exceptions work. The stack of callers is walked to find a frame that has a handler for the kind of exception that has been thrown.

    In the case of X::Argh.new(message => "oh no") that is a regular exception, but there's also "control exceptions". You can ask your raku what's available:

    $ raku -e 'for CX::.values {
            given .^name {
                say ($_ eq "CX::Take" ?? "My Favourite!  " !! "")
                    ~ $_
            }
        }'
    CX::Next
    CX::Proceed
    CX::Last
    My Favourite!  CX::Take
    CX::Done
    CX::Return
    CX::Redo
    CX::Emit
    CX::Warn
    CX::Succeed
    

    Here you'll see:

    A gather is - at its most basic - an exception handler that receives the value so that it can be given as the iteration result and then "freezes execution" at that spot (it "takes a continuation") so that it can properly be lazy and do nothing until another value is requested.

    But when multiple threads are involved, we don't really have a single stack of execution where going through each frame's caller starting at the take will lead us to the gather.

    Instead, we reach whatever has called our code, which in most cases is probably the work loop of a Scheduler's worker thread.

    In the most common case, a regular exception reaching this point will be recorded and used to break the Vow of a Promise. In particular, this is what you get when you use the start syntax.

    For the case of take, there is no default handler. The take control exception can't find a corresponding gather and bails out.

    There is currently no design or specification for how a CX::Take could be made to cross that gap. I could think of a few special cases where it could be made to work with something like a gather race for or so, but nothing general comes to mind just yet.