publish-subscriberakutaprakudoact

Difference between Supply method act vs tap


In Raku documentation about the Supply method act (vs tap) https://docs.raku.org/type/Supply#method_act it is stated that:

the given code is guaranteed to be executed by only one thread at a time

My understanding is that a thread must finish with the specific code object before another thread has to run it.

If that is the case, I stumbled upon a different behavior when I tried to implement the feature. Take a look at the following code snippet, where 2 "acts" are created and run in different threads:

#!/usr/bin/env perl6

say 'Main  runs in [ thread : ', +$*THREAD, ' ]';

my $b = 1;

sub actor {
    print "    Tap_$*tap                             : $^a  ", now;
    
    $*tap < 2 ??
                    do {
                        say " - Sleep 0.1";
                        sleep 0.1
                    }
              !!
                    do {
                        say " - Sleep 0.2";
                        sleep 0.2;
                    }
    
    $b++;
    say "    Tap_$*tap  +1 to \$b                     $b  ", now;
}

my $supply = supply {
    for 1..100 -> $i {
        say "For Tap_$*tap [ \$i = $i ] => About to emit : $b  ", now;
        emit $b;
        say "For Tap_$*tap [ \$i = $i ] =>       Emitted : $b  ", now;
        
        done if $b > 5
    }
}

start {
    my $*tap = 1;
    once say "Tap_$*tap runs in [ thread : {+$*THREAD} ]";
    $supply.act: &actor
}

start {
    my $*tap = 2;
    once say "Tap_$*tap runs in [ thread : {+$*THREAD} ]";
    $supply.act: &actor
}

sleep 1;

and the result is the following (with added time gaps and comments):

  1   Main  runs in [ thread : 1 ]                                                       - Main thread              
  2   Tap_1 runs in [ thread : 4 ]                                                       - Tap 1 thread             
  3   For Tap_1 [ $i = 1 ] => About to emit : 1  Instant:1603354571.198187               - Supply thread [for tap 1]
  4       Tap_1                             : 1  Instant:1603354571.203074 - Sleep 0.1   - Tap 1 thread             
  5   Tap_2 runs in [ thread : 6 ]                                                       - Tap 2 thread             
  6   For Tap_2 [ $i = 1 ] => About to emit : 1  Instant:1603354571.213826               - Supply thread [for tap 2]
  7       Tap_2                             : 1  Instant:1603354571.213826 - Sleep 0.2   - Tap 2 thread             
  8                                                                                                                 
  9   -----------------------------------------------------------------------------------> Time +0.1 seconds        
 10                                                                                                                 
 11       Tap_1  +1 to $b                     2  Instant:1603354571.305723               - Tap 1 thread             
 12   For Tap_1 [ $i = 1 ] =>       Emitted : 2  Instant:1603354571.305723               - Supply thread [for tap 1]
 13   For Tap_1 [ $i = 2 ] => About to emit : 2  Instant:1603354571.30768                - Supply thread [for tap 1]
 14       Tap_1                             : 2  Instant:1603354571.30768  - Sleep 0.1   - Tap 1 thread             
 15                                                                                                                 
 16   -----------------------------------------------------------------------------------> Time +0.1 seconds        
 17                                                                                                                 
 18       Tap_1  +1 to $b                     3  Instant:1603354571.410354               - Tap 1 thread             
 19   For Tap_1 [ $i = 2 ] =>       Emitted : 4  Instant:1603354571.425018               - Supply thread [for tap 1]
 20       Tap_2  +1 to $b                     4  Instant:1603354571.425018               - Tap 2 thread             
 21   For Tap_1 [ $i = 3 ] => About to emit : 4  Instant:1603354571.425018               - Supply thread [for tap 1]
 22   For Tap_2 [ $i = 1 ] =>       Emitted : 4  Instant:1603354571.425995               - Supply thread [for tap 2]
 23       Tap_1                             : 4  Instant:1603354571.425995 - Sleep 0.1   - Tap 1 thread             
 24   For Tap_2 [ $i = 2 ] => About to emit : 4  Instant:1603354571.425995               - Supply thread [for tap 2]
 25       Tap_2                             : 4  Instant:1603354571.426973 - Sleep 0.2   - Tap 2 thread             
 26                                                                                                                 
 27   -----------------------------------------------------------------------------------> Time +0.1 seconds        
 28                                                                                                                 
 29       Tap_1  +1 to $b                     5  Instant:1603354571.528079               - Tap 1 thread             
 30   For Tap_1 [ $i = 3 ] =>       Emitted : 5  Instant:1603354571.52906                - Supply thread [for tap 1]
 31   For Tap_1 [ $i = 4 ] => About to emit : 5  Instant:1603354571.52906                - Supply thread [for tap 1]
 32       Tap_1                             : 5  Instant:1603354571.53004  - Sleep 0.1   - Tap 1 thread             
 33                                                                                                                 
 34   -----------------------------------------------------------------------------------> Time +0.1 seconds        
 35                                                                                                                 
 36       Tap_2  +1 to $b                     6  Instant:1603354571.62859                - Tap 2 thread             
 37   For Tap_2 [ $i = 2 ] =>       Emitted : 6  Instant:1603354571.62859                - Supply thread [for tap 2]
 38       Tap_1  +1 to $b                     7  Instant:1603354571.631512               - Tap 1 thread             
 39   For Tap_1 [ $i = 4 ] =>       Emitted : 7  Instant:1603354571.631512               - Supply thread [for tap 2]

One can easily observe that the code object (subroutine &actor) is running concurrently in 2 threads (for example see output lines 4 & 7).

Can somebody clarify my misunderstanding about the matter?


Solution

  • There's very rarely any difference between tap and act in everyday use of Raku, because almost every Supply that you encounter is a serial supply. A serial supply is one that already enforces the protocol that a value will not be emitted until the previous one has been processed. The implementation of act is:

    method act(Supply:D: &actor, *%others) {
        self.sanitize.tap(&actor, |%others)
    }
    

    Where sanitize enforces the serial emission of values and in addition makes sure that events follow the grammar emit* [done | quit]. Since these properties are usually highly desirable anyway, every built-in way to obtain a Supply provides them, with the exception of being able to create a Supplier and call unsanitized-supply on it. (Historical note: a very early prototype did not enforce these properties so widely, creating more of a need for a method doing what act does. While the need for it diminished as the design involved into what was eventually shipped in the first language release, it got to keep its nice short name.)

    The misunderstanding arises from expecting the serialization of events to be per source, whereas in reality it is per subscription. Consider this example:

    my $timer = Supply.interval(1);
    $timer.tap: { say "A: {now}" };
    $timer.tap: { say "B: {now}" };
    sleep 5;
    

    Which produces output like this:

    A: Instant:1603364746.02766
    B: Instant:1603364746.031255
    A: Instant:1603364747.025255
    B: Instant:1603364747.028305
    A: Instant:1603364748.025584
    B: Instant:1603364748.029797
    A: Instant:1603364749.026596
    B: Instant:1603364749.029643
    A: Instant:1603364750.027881
    B: Instant:1603364750.030851
    A: Instant:1603364751.030137
    

    There is one source of events, but we establish two subscriptions to it. Each subscription enforces the serial rule, so if we do this:

    my $timer = Supply.interval(1);
    $timer.tap: { sleep 1.5; say "A: {now}" };
    $timer.tap: { sleep 1.5; say "B: {now}" };
    sleep 5;
    

    Then we observe the following output:

    A: Instant:1603364909.442341
    B: Instant:1603364909.481506
    A: Instant:1603364910.950359
    B: Instant:1603364910.982771
    A: Instant:1603364912.451916
    B: Instant:1603364912.485064
    

    Showing that each subscription is getting one event at a time, but merely sharing an (on-demand) source doesn't create any shared backpressure.

    Since the concurrency control is associated with the subscription, it is irrelevant if the same closure clone is passed to tap/act. Enforcing concurrency control across multiple subscriptions is the realm of supply/react/whenever. For example this:

    my $timer = Supply.interval(1);
    react {
        whenever $timer {
            sleep 1.5;
            say "A: {now}"
        }
        whenever $timer {
            sleep 1.5;
            say "B: {now}"
        }
    }
    

    Gives output like this:

    A: Instant:1603365363.872672
    B: Instant:1603365365.379991
    A: Instant:1603365366.882114
    B: Instant:1603365368.383392
    A: Instant:1603365369.884608
    B: Instant:1603365371.386087
    

    Where each event is 1.5s apart, because of the concurrency control implied by the react block.