In Raku documentation about the Supply method act (vs tap) https://docs.raku.org/type/Supply#method_act it is stated that:
the given code is guaranteed to be executed by only one thread at a time
My understanding is that a thread must finish with the specific code object before another thread has to run it.
If that is the case, I stumbled upon a different behavior when I tried to implement the feature. Take a look at the following code snippet, where 2 "acts" are created and run in different threads:
#!/usr/bin/env perl6
say 'Main runs in [ thread : ', +$*THREAD, ' ]';
my $b = 1;
sub actor {
print " Tap_$*tap : $^a ", now;
$*tap < 2 ??
do {
say " - Sleep 0.1";
sleep 0.1
}
!!
do {
say " - Sleep 0.2";
sleep 0.2;
}
$b++;
say " Tap_$*tap +1 to \$b $b ", now;
}
my $supply = supply {
for 1..100 -> $i {
say "For Tap_$*tap [ \$i = $i ] => About to emit : $b ", now;
emit $b;
say "For Tap_$*tap [ \$i = $i ] => Emitted : $b ", now;
done if $b > 5
}
}
start {
my $*tap = 1;
once say "Tap_$*tap runs in [ thread : {+$*THREAD} ]";
$supply.act: &actor
}
start {
my $*tap = 2;
once say "Tap_$*tap runs in [ thread : {+$*THREAD} ]";
$supply.act: &actor
}
sleep 1;
and the result is the following (with added time gaps and comments):
1 Main runs in [ thread : 1 ] - Main thread
2 Tap_1 runs in [ thread : 4 ] - Tap 1 thread
3 For Tap_1 [ $i = 1 ] => About to emit : 1 Instant:1603354571.198187 - Supply thread [for tap 1]
4 Tap_1 : 1 Instant:1603354571.203074 - Sleep 0.1 - Tap 1 thread
5 Tap_2 runs in [ thread : 6 ] - Tap 2 thread
6 For Tap_2 [ $i = 1 ] => About to emit : 1 Instant:1603354571.213826 - Supply thread [for tap 2]
7 Tap_2 : 1 Instant:1603354571.213826 - Sleep 0.2 - Tap 2 thread
8
9 -----------------------------------------------------------------------------------> Time +0.1 seconds
10
11 Tap_1 +1 to $b 2 Instant:1603354571.305723 - Tap 1 thread
12 For Tap_1 [ $i = 1 ] => Emitted : 2 Instant:1603354571.305723 - Supply thread [for tap 1]
13 For Tap_1 [ $i = 2 ] => About to emit : 2 Instant:1603354571.30768 - Supply thread [for tap 1]
14 Tap_1 : 2 Instant:1603354571.30768 - Sleep 0.1 - Tap 1 thread
15
16 -----------------------------------------------------------------------------------> Time +0.1 seconds
17
18 Tap_1 +1 to $b 3 Instant:1603354571.410354 - Tap 1 thread
19 For Tap_1 [ $i = 2 ] => Emitted : 4 Instant:1603354571.425018 - Supply thread [for tap 1]
20 Tap_2 +1 to $b 4 Instant:1603354571.425018 - Tap 2 thread
21 For Tap_1 [ $i = 3 ] => About to emit : 4 Instant:1603354571.425018 - Supply thread [for tap 1]
22 For Tap_2 [ $i = 1 ] => Emitted : 4 Instant:1603354571.425995 - Supply thread [for tap 2]
23 Tap_1 : 4 Instant:1603354571.425995 - Sleep 0.1 - Tap 1 thread
24 For Tap_2 [ $i = 2 ] => About to emit : 4 Instant:1603354571.425995 - Supply thread [for tap 2]
25 Tap_2 : 4 Instant:1603354571.426973 - Sleep 0.2 - Tap 2 thread
26
27 -----------------------------------------------------------------------------------> Time +0.1 seconds
28
29 Tap_1 +1 to $b 5 Instant:1603354571.528079 - Tap 1 thread
30 For Tap_1 [ $i = 3 ] => Emitted : 5 Instant:1603354571.52906 - Supply thread [for tap 1]
31 For Tap_1 [ $i = 4 ] => About to emit : 5 Instant:1603354571.52906 - Supply thread [for tap 1]
32 Tap_1 : 5 Instant:1603354571.53004 - Sleep 0.1 - Tap 1 thread
33
34 -----------------------------------------------------------------------------------> Time +0.1 seconds
35
36 Tap_2 +1 to $b 6 Instant:1603354571.62859 - Tap 2 thread
37 For Tap_2 [ $i = 2 ] => Emitted : 6 Instant:1603354571.62859 - Supply thread [for tap 2]
38 Tap_1 +1 to $b 7 Instant:1603354571.631512 - Tap 1 thread
39 For Tap_1 [ $i = 4 ] => Emitted : 7 Instant:1603354571.631512 - Supply thread [for tap 2]
One can easily observe that the code object (subroutine &actor) is running concurrently in 2 threads (for example see output lines 4 & 7).
Can somebody clarify my misunderstanding about the matter?
There's very rarely any difference between tap
and act
in everyday use of Raku, because almost every Supply
that you encounter is a serial supply. A serial supply is one that already enforces the protocol that a value will not be emitted until the previous one has been processed. The implementation of act
is:
method act(Supply:D: &actor, *%others) {
self.sanitize.tap(&actor, |%others)
}
Where sanitize
enforces the serial emission of values and in addition makes sure that events follow the grammar emit* [done | quit]
. Since these properties are usually highly desirable anyway, every built-in way to obtain a Supply
provides them, with the exception of being able to create a Supplier
and call unsanitized-supply
on it. (Historical note: a very early prototype did not enforce these properties so widely, creating more of a need for a method doing what act
does. While the need for it diminished as the design involved into what was eventually shipped in the first language release, it got to keep its nice short name.)
The misunderstanding arises from expecting the serialization of events to be per source, whereas in reality it is per subscription. Consider this example:
my $timer = Supply.interval(1);
$timer.tap: { say "A: {now}" };
$timer.tap: { say "B: {now}" };
sleep 5;
Which produces output like this:
A: Instant:1603364746.02766
B: Instant:1603364746.031255
A: Instant:1603364747.025255
B: Instant:1603364747.028305
A: Instant:1603364748.025584
B: Instant:1603364748.029797
A: Instant:1603364749.026596
B: Instant:1603364749.029643
A: Instant:1603364750.027881
B: Instant:1603364750.030851
A: Instant:1603364751.030137
There is one source of events, but we establish two subscriptions to it. Each subscription enforces the serial rule, so if we do this:
my $timer = Supply.interval(1);
$timer.tap: { sleep 1.5; say "A: {now}" };
$timer.tap: { sleep 1.5; say "B: {now}" };
sleep 5;
Then we observe the following output:
A: Instant:1603364909.442341
B: Instant:1603364909.481506
A: Instant:1603364910.950359
B: Instant:1603364910.982771
A: Instant:1603364912.451916
B: Instant:1603364912.485064
Showing that each subscription is getting one event at a time, but merely sharing an (on-demand) source doesn't create any shared backpressure.
Since the concurrency control is associated with the subscription, it is irrelevant if the same closure clone is passed to tap
/act
. Enforcing concurrency control across multiple subscriptions is the realm of supply
/react
/whenever
. For example this:
my $timer = Supply.interval(1);
react {
whenever $timer {
sleep 1.5;
say "A: {now}"
}
whenever $timer {
sleep 1.5;
say "B: {now}"
}
}
Gives output like this:
A: Instant:1603365363.872672
B: Instant:1603365365.379991
A: Instant:1603365366.882114
B: Instant:1603365368.383392
A: Instant:1603365369.884608
B: Instant:1603365371.386087
Where each event is 1.5s apart, because of the concurrency control implied by the react
block.