perlasync-awaitpromisemojolicious

Perl Mojolicious: limit concurrency with all_settled


In Mojolicious for Perl, is there a way to limit concurrency when using Promise->all_settled?

In the example below, I would like to limit concurrency=>10. I'm using sleep to simulate a blocking operation:

use Mojolicious::Lite -signatures, -async_await;

helper isOdd_p => sub($self, $number)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                sleep 1;
                $number % 2;
            },
            sub ($subprocess, $err, @res ) {
                $reject->( $err ) if $err;
                $reject->( @res ) if @res && $res[0]==0; # reject Even
                $resolve->( @res );
            })
            
        });
};

any '/' => async sub ($c) {
    $c->render_later();
    my @promises = map { $c->isOdd_p($_) } (0..50);
    my @results = eval { await Mojo::Promise->all_settled(@promises) };
    #my @results = eval { await Mojo::Promise->map({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) ) };
    if (my $err = $@) {
        $c->render(json => $@); # will this line be ever reached with all_settled??
    } else {
        $c->render(json => [ @results ] );
    }
};

app->start;

Solution

  • Currently, you start all 51 tasks, then wait for all 51 tasks to finish.

    Limiting concurrency to 10 would mean starting 10 tasks, waiting for one to finish, starting one, waiting for one to finish, etc.

    Three tiny changes to map suffices. (Changed two mentions of all to all_settled, and changed the rejection handler to keep going. Also changed it from a M::P method to a sub.)

    sub map_all_settled {
      my ($class, $options, $cb, @items) = ('Mojo::Promise', ref $_[0] eq 'HASH' ? shift : {}, @_);
     
      return $class->all_settled(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};
     
      my @start = map { $_->$cb } splice @items, 0, $options->{concurrency};
      my @wait  = map { $start[0]->clone } 0 .. $#items;
     
      my $start_next = sub {
        return () unless my $item = shift @items;
        my ($start_next, $chain) = (__SUB__, shift @wait);
        $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); $start_next->() }) for $item;
        return ();
      };
     
      $_->then($start_next, sub { }) for @start;
     
      return $class->all_settled(@start, @wait);
    }
    
    my @results = await map_all_settled({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) );