perlio-async

How can I use IO::Async with an array as input?


I have this loop:

foreach my $element ( @array ) {
     my $result = doSomething($element);
}

Since it doesn't matter that the array is processed in order, and the script runs long, I'd like run doSomething() asynchronously.

I am looking at IO::Async for this, but I can't seem to find an example where the input to the loop is a simple array as above. The example seem to focus on open sockets, STDIN, etc.

Here is the example given, showing feeding data to the loop via STDIN:

$loop->add( IO::Async::Stream->new_for_stdin(
   on_read => sub {
      my ( $self, $buffref, $eof ) = @_;

      while( $$buffref =~ s/^(.*)\n// ) {
          print "You typed a line $1\n";
      }

      return 0;
   },
) );

How can I feed it the array elements instead?


Solution

  • As commented by @StefanBecker, the simplest way to handle this with IO::Async is by using an IO::Async::Function.

    From the docs :

    This subclass of IO::Async::Notifier wraps a function body in a collection of worker processes, to allow it to execute independently of the main process.

    In the IO::Async framework, the typical use case for IO::Async::Function is when a blocking process needs to be executed asynchronously.

    Disclaimer : please note that, as commented also by @zdim, IO::Async might not be the best fit for your use case. A pure process parallelizer such as Parallel::ForkManager would probably be your best option here, as it basically implements the same functionality (forking and executing in parallel), yet in a much more straight-forward fashion. One of the main differentiating factor of IO::Async comes with its I/O multiplexing capabilities, that you are seemingly not using here.

    But since you namely asked for IO::Async, here is an example of such implementation : I turned doSomething into a dummy method that just waits the amount of time given as argument. This allows you to observe the effect of asynchronous execution.

    use strict;
    use warnings;
    
    use IO::Async::Function;
    use IO::Async::Loop;
    use Future;
    
    # dummy sub
    sub doSomething { 
        my ( $delay ) = @_;
        print "start waiting $delay second(s)\n";
        sleep $delay;
        print "done sleeping $delay second(s)\n"; 
        return $delay;
    }
    
    # prepare the function for execution
    my $loop = IO::Async::Loop->new;
    my $function = IO::Async::Function->new( code => sub { return doSomething($_[0]) } );
    $loop->add($function);
    
    # trigger asynchronous processing
    my @array = qw/5 2 4 0/;
    my @futures = map { $function->call( args => [ $_ ] ) } @array;
    
    # safely wait for all ops to complete
    Future->wait_all(@futures)->await;
    print "all done !\n";
    

    This yields :

    start waiting 5 second(s)
    start waiting 2 second(s)
    start waiting 4 second(s)
    start waiting 0 second(s)
    done sleeping 0 second(s)
    done sleeping 2 second(s)
    done sleeping 4 second(s)
    done sleeping 5 second(s)
    all done !
    

    NB1 : Future->wait_all(@futures)->await could also be written $_->get for @futures, however the first expression, that uses convergent Futures, has the advantages that it will never fail, even if an underlying call actually dies.

    NB2 : many options are available in IO::Async::Function and Future to handle errors, manage the number of workers and their behavior, and so on. Check out the docs for more details...