8

I'm calling an async client method by streaming over a list of objects. The method returns Future.

What's the best way to iterate over the list of Futures returned after the call (so as to process those Future which comes first)?

Note: The async client only returns Future not CompletableFuture.

Following is the code:

List<Future<Object>> listOfFuture = objectsToProcess.parallelStream()
    .map((object) -> {
        /* calling an async client returning a Future<Object> */ })
    .collect(Collectors.toList());
Harmlezz
  • 7,972
  • 27
  • 35
syfro
  • 121
  • 1
  • 5
  • Did you look into using a CompletionService rather than mapping the results to an array? http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletionService.html – Andrei Epure Apr 05 '17 at 09:13
  • Perhaps try [CompletionService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletionService.html)? Here is an [explanation](http://stackoverflow.com/a/19348417/6785649). – Samuel Kok Apr 05 '17 at 09:14
  • 4
    The `CompletionService` is quite useless if you don’t have control over the code that produces the `Future`/submits the job to an `Executor`. – Holger Apr 05 '17 at 09:19

2 Answers2

7

Having this list of List<Future<Object>>, I would submit it to a custom pool, instead of using the default stream parallel processing.

That is because the stream api uses a common pool for parallel processing and you will call get on those Futures(if it takes significant time for processing) - you will block all other stream operations that use parallel operations within your application until this one is done.

This would a bit like this:

forJoinPool.submit( () -> list.stream().parallel().map(future -> future.get()).collect(Collectors.toList())).get();

I would go with a custom pool like shown here

Community
  • 1
  • 1
Eugene
  • 117,005
  • 15
  • 201
  • 306
0

I found the stream().parallel() tricky to control, depending on the data structure of origin it may not always run in parallel. Here's a way to fork join a stream of parallel tasks using CompletableFuture.join.

    var pool = new ForkJoinPool(4);
    IntFunction<CompletableFuture<IntStream>> makePair = i -> CompletableFuture.supplyAsync(() -> IntStream.of(i, i), pool);
    var res = IntStream.of(1, 2, 3, 4, 5).mapToObj(makePair).flatMapToInt(CompletableFuture::join).sum();
    System.out.println(res);
David Lilljegren
  • 1,799
  • 16
  • 19