I have this loop:
foreach my $element ( @array ) {
my $result = doSomething($element);
}
Since it doesn't matter that the array is processed in order, and the script runs long, I'd like run doSomething()
asynchronously.
I am looking at IO::Async for this, but I can't seem to find an example where the input to the loop is a simple array as above. The example seem to focus on open sockets, STDIN, etc.
Here is the example given, showing feeding data to the loop via STDIN:
$loop->add( IO::Async::Stream->new_for_stdin(
on_read => sub {
my ( $self, $buffref, $eof ) = @_;
while( $$buffref =~ s/^(.*)\n// ) {
print "You typed a line $1\n";
}
return 0;
},
) );
How can I feed it the array elements instead?
As commented by @StefanBecker, the simplest way to handle this with IO::Async is by using an IO::Async::Function.
From the docs :
This subclass of IO::Async::Notifier wraps a function body in a collection of worker processes, to allow it to execute independently of the main process.
In the IO::Async
framework, the typical use case for IO::Async::Function
is when a blocking process needs to be executed asynchronously.
Disclaimer : please note that, as commented also by @zdim, IO::Async
might not be the best fit for your use case. A pure process parallelizer such as Parallel::ForkManager would probably be your best option here, as it basically implements the same functionality (forking and executing in parallel), yet in a much more straight-forward fashion. One of the main differentiating factor of IO::Async
comes with its I/O multiplexing capabilities, that you are seemingly not using here.
But since you namely asked for IO::Async
, here is an example of such implementation : I turned doSomething
into a dummy method that just waits the amount of time given as argument. This allows you to observe the effect of asynchronous execution.
use strict;
use warnings;
use IO::Async::Function;
use IO::Async::Loop;
use Future;
# dummy sub
sub doSomething {
my ( $delay ) = @_;
print "start waiting $delay second(s)\n";
sleep $delay;
print "done sleeping $delay second(s)\n";
return $delay;
}
# prepare the function for execution
my $loop = IO::Async::Loop->new;
my $function = IO::Async::Function->new( code => sub { return doSomething($_[0]) } );
$loop->add($function);
# trigger asynchronous processing
my @array = qw/5 2 4 0/;
my @futures = map { $function->call( args => [ $_ ] ) } @array;
# safely wait for all ops to complete
Future->wait_all(@futures)->await;
print "all done !\n";
This yields :
start waiting 5 second(s)
start waiting 2 second(s)
start waiting 4 second(s)
start waiting 0 second(s)
done sleeping 0 second(s)
done sleeping 2 second(s)
done sleeping 4 second(s)
done sleeping 5 second(s)
all done !
NB1 : Future->wait_all(@futures)->await
could also be written $_->get for @futures
, however the first expression, that uses convergent Futures, has the advantages that it will never fail, even if an underlying call actually dies.
NB2 : many options are available in IO::Async::Function
and Future
to handle errors, manage the number of workers and their behavior, and so on. Check out the docs for more details...