In Mojolicious for Perl, is there a way to limit concurrency
when using Promise->all_settled
?
In the example below, I would like to limit concurrency=>10
. I'm using sleep
to simulate a blocking operation:
use Mojolicious::Lite -signatures, -async_await;
helper isOdd_p => sub($self, $number)
{
return Mojo::Promise->new(sub($resolve, $reject ) {
Mojo::IOLoop->subprocess(
sub {
sleep 1;
$number % 2;
},
sub ($subprocess, $err, @res ) {
$reject->( $err ) if $err;
$reject->( @res ) if @res && $res[0]==0; # reject Even
$resolve->( @res );
})
});
};
any '/' => async sub ($c) {
$c->render_later();
my @promises = map { $c->isOdd_p($_) } (0..50);
my @results = eval { await Mojo::Promise->all_settled(@promises) };
#my @results = eval { await Mojo::Promise->map({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) ) };
if (my $err = $@) {
$c->render(json => $@); # will this line be ever reached with all_settled??
} else {
$c->render(json => [ @results ] );
}
};
app->start;
Currently, you start all 51 tasks, then wait for all 51 tasks to finish.
Limiting concurrency to 10 would mean starting 10 tasks, waiting for one to finish, starting one, waiting for one to finish, etc.
Three tiny changes to map
suffices. (Changed two mentions of all
to all_settled
, and changed the rejection handler to keep going. Also changed it from a M::P method to a sub.)
sub map_all_settled {
my ($class, $options, $cb, @items) = ('Mojo::Promise', ref $_[0] eq 'HASH' ? shift : {}, @_);
return $class->all_settled(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};
my @start = map { $_->$cb } splice @items, 0, $options->{concurrency};
my @wait = map { $start[0]->clone } 0 .. $#items;
my $start_next = sub {
return () unless my $item = shift @items;
my ($start_next, $chain) = (__SUB__, shift @wait);
$_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); $start_next->() }) for $item;
return ();
};
$_->then($start_next, sub { }) for @start;
return $class->all_settled(@start, @wait);
}
my @results = await map_all_settled({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) );