2

I have this loop:

foreach my $element ( @array ) {
     my $result = doSomething($element);
}

Since it doesn't matter that the array is processed in order, and the script runs long, I'd like run doSomething() asynchronously.

I am looking at IO::Async for this, but I can't seem to find an example where the input to the loop is a simple array as above. The example seem to focus on open sockets, STDIN, etc.

Here is the example given, showing feeding data to the loop via STDIN:

$loop->add( IO::Async::Stream->new_for_stdin(
   on_read => sub {
      my ( $self, $buffref, $eof ) = @_;

      while( $$buffref =~ s/^(.*)\n// ) {
          print "You typed a line $1\n";
      }

      return 0;
   },
) );

How can I feed it the array elements instead?

GMB
  • 216,147
  • 25
  • 84
  • 135
Marcus
  • 5,772
  • 8
  • 35
  • 60
  • 1
    With the amount of information you've given, it's hard to give you a good answer. Please [edit] your post and add some relevant example code to flesh out `doSomething`. Most likely you will want `doSomething` to start an asynchronous request and have it return a handle. You then collect those handles and wait until they indicate that they are finished. – Corion Dec 18 '18 at 16:01
  • I *do* want doSomething() to start an asynchronous request, that's correct. I can't figure out the syntax. – Marcus Dec 18 '18 at 16:06
  • 1
    Sounds like you should look into e.g. `IO::Async::Function` with your `doSomething` as code block. Then the loop can use `$function->call()` to pass in parameters and store the result of that call in an array. Then you need a second loop that would harvest the async function calls, i.e. that is the code that will "block" waiting for the results. – Stefan Becker Dec 18 '18 at 17:28
  • 1
    `IO::Async::Routine` might be applicable too. The example from the man page looks like it can be directly adapted to solve your problem. You only have to define the stop condition for the loop, maybe a counter that reaches 0 when all functions have returned? – Stefan Becker Dec 18 '18 at 17:36
  • 1
    The examples you see in docs don't show you what you want because this is an _event_ handling system, first and foremost. Yes, you can use it for any asynchronous processing, but that's not the main purpose of the framework. Why not `Parallel::ForkManager`, what seems to suit precisely what you want? It'll be a few lines of code and conceptually _much_ simpler. – zdim Dec 18 '18 at 17:37
  • 1
    For the `IO::Async::Function` solution: `foreach (...) { push(@futures, $function->call($element)); } Future->wait_all(@futures);`. – Stefan Becker Dec 18 '18 at 17:53

1 Answers1

4

As commented by @StefanBecker, the simplest way to handle this with IO::Async is by using an IO::Async::Function.

From the docs :

This subclass of IO::Async::Notifier wraps a function body in a collection of worker processes, to allow it to execute independently of the main process.

In the IO::Async framework, the typical use case for IO::Async::Function is when a blocking process needs to be executed asynchronously.

Disclaimer : please note that, as commented also by @zdim, IO::Async might not be the best fit for your use case. A pure process parallelizer such as Parallel::ForkManager would probably be your best option here, as it basically implements the same functionality (forking and executing in parallel), yet in a much more straight-forward fashion. One of the main differentiating factor of IO::Async comes with its I/O multiplexing capabilities, that you are seemingly not using here.

But since you namely asked for IO::Async, here is an example of such implementation : I turned doSomething into a dummy method that just waits the amount of time given as argument. This allows you to observe the effect of asynchronous execution.

use strict;
use warnings;

use IO::Async::Function;
use IO::Async::Loop;
use Future;

# dummy sub
sub doSomething { 
    my ( $delay ) = @_;
    print "start waiting $delay second(s)\n";
    sleep $delay;
    print "done sleeping $delay second(s)\n"; 
    return $delay;
}

# prepare the function for execution
my $loop = IO::Async::Loop->new;
my $function = IO::Async::Function->new( code => sub { return doSomething($_[0]) } );
$loop->add($function);

# trigger asynchronous processing
my @array = qw/5 2 4 0/;
my @futures = map { $function->call( args => [ $_ ] ) } @array;

# safely wait for all ops to complete
Future->wait_all(@futures)->await;
print "all done !\n";

This yields :

start waiting 5 second(s)
start waiting 2 second(s)
start waiting 4 second(s)
start waiting 0 second(s)
done sleeping 0 second(s)
done sleeping 2 second(s)
done sleeping 4 second(s)
done sleeping 5 second(s)
all done !

NB1 : Future->wait_all(@futures)->await could also be written $_->get for @futures, however the first expression, that uses convergent Futures, has the advantages that it will never fail, even if an underlying call actually dies.

NB2 : many options are available in IO::Async::Function and Future to handle errors, manage the number of workers and their behavior, and so on. Check out the docs for more details...

Glorfindel
  • 21,988
  • 13
  • 81
  • 109
GMB
  • 216,147
  • 25
  • 84
  • 135
  • 1
    Instead of `$_->get for @futures` you can use [Future](https://metacpan.org/pod/Future) composition to resolve them as they complete: `Future->wait_all(@futures)->await;` (`await` is just `get` that doesn't return the success or throw an exception on failure, and `wait_all` can't fail.) There are of course lots of other options if you want to have Future handle failures (in this case, exceptions thrown in the function) gracefully, etc. – Grinnz Dec 18 '18 at 20:19
  • 1
    Personally I prefer IO::Async::Function as a solution for this because of the flexibility that the event loop and Futures provide for integration with the rest of your program. – Grinnz Dec 18 '18 at 20:20
  • 1
    @Grinnz : thanks for the comment, updated the answer accordingly. And yes, I enjoy IO::Async and Future lot too, I find it brilliant – GMB Dec 18 '18 at 20:24
  • 1
    I agree that using futures is better (but then perhaps a bit of a comment would help the uninitiated?). And why not [IO::Async::Future](https://metacpan.org/pod/IO::Async::Future) then? – zdim Dec 18 '18 at 20:30
  • 1
    @zdim : sure, thanks... Updated my answer (see *NB1*) – GMB Dec 18 '18 at 21:04
  • @zdim IO::Async::Future is already the Future subclass that IO::Async::Function will end up returning, so Future->wait_all will create a Future of the same class (since that's what gets passed to it). It's also the subclass returned by $loop->new_future where $loop is an IO::Async::Loop. You always need a subclass specific to the event loop you're using in order for `get` and `await` to work. – Grinnz Dec 19 '18 at 17:58
  • @Grinnz I don't understand -- are you saying that `use IO::Async::Future` instead of `use Future`, with suitable other (small) code changes, won't work here? All I meant was to suggest to make that explicit. – zdim Dec 20 '18 at 02:03
  • @zdim The Future class you use for wait_all won't matter, as out of necessity it will use the subclass of the Futures you pass to it, which are created by IO::Async::Function. So it's normal to just use Future for that. You would use a specific subclass when you need to initially create a future yourself, but to create an IO::Async::Future you almost always want to just use $loop->new_future so there's never a reason to use it directly. – Grinnz Dec 20 '18 at 06:42
  • @Grinnz OK. So is there a problem with doing precisely _that_ (futures' loop), which is all we need here from the framework, to merely run in parallel? – zdim Dec 20 '18 at 18:23
  • @zdim I don't understand the question. But see the documentation for IO::Async::Future, which suggests to only instantiate such objects via the IO::Async loop or related classes. – Grinnz Dec 20 '18 at 18:42
  • @Grinnz But that is exactly what I'm saying, to use `$loop` (needed anyway) directly to build futures, using the subclass. Can't run any of this now, will try later; perhaps I'm missing something here. – zdim Dec 20 '18 at 18:55
  • @zdim That's what IO::Async::Function does internally. In this example there are no Futures created by the user except for the convergent future, which is created based on the class of the Futures passed in. – Grinnz Dec 20 '18 at 21:25
  • @Grinnz Ah, thank you. I should look into all that code at some point, and play with this way more. – zdim Dec 20 '18 at 21:31