4

I have an input stream of queries which are getting executed asynchronously. I want to make sure that when I use Completablefuture::join, the result of those requires are collected int the order of input query stream.

This is how my code looks like:

queries.stream()
     .map(query -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return SQLQueryEngine.execute(query);
                    } catch (InternalErrorException e) {
                        throw new RuntimeException(e);
                    }
     }))
     .map(CompletableFuture::join)
     .collect(Collectors.toList());

SQLQueryEngine.execute(query); returns a List<Results> so output is List<List<Result>. I want to flatten and combine all the results into one single List. If i use .flatMap(List::stream) before collection to flatten, will it maintain the ordering?

premprakash
  • 1,505
  • 3
  • 18
  • 27
  • 3
    Misha has already provided the answer. As a sidenote, keep in mind that you are executing the queries _sequentially_, as you wait for the response of each `CompletableFuture` one after the other (join in the map). If you want to process them _concurrently_, map the stream to a `List` of `CompletableFutures`, and call `allOf` on them. Example [here](http://stackoverflow.com/questions/30025428/listfuture-to-futurelist-sequence) – Ruben Oct 27 '15 at 08:31
  • Thanks @Ruben for pointing this out :) – premprakash Oct 27 '15 at 19:06

1 Answers1

8

You probably meant .flatMap and yes, it will retain the ordering.

Consider explicitly passing an Executor to supplyAsync to avoid scheduling your IO-bound sql queries in ForkJoinPool.commonPool().

As @Ruben pointed out, you are joining each task in the current thread immediately after submitting it and before submitting the next query, which is likely a bug. You should submit all queries first and only then start joining.

You can do it like this (with static import of toList):

queries.stream()
    .map(query -> CompletableFuture.supplyAsync(...))
    .collect(toList())
    .stream()
    .map(CompletableFuture::join)
    .collect(toList());
Community
  • 1
  • 1
Misha
  • 27,433
  • 6
  • 62
  • 78
  • 1
    The code in its current form won’t pollute the common pool to much, as it `join`s each future right after its submission. – Holger Oct 27 '15 at 08:46
  • 1
    True, assuming that this is the only thread and nothing else is happening that might be scheduling tasks into the common pool. Even if for a particular application there's no harm in running blocking tasks in the common pool, it's just a bad habit to get into. – Misha Oct 27 '15 at 08:53
  • 1
    Right, especially as joining each task immediately doesn’t seem to be intentional. So from this point of view, my comment was more of sarcasm… – Holger Oct 27 '15 at 08:55
  • 2
    Apparently, my sarcasm detector was offline due to insufficient caffeine intake. I think it's best I incorporate @Ruben's comment into the answer. – Misha Oct 27 '15 at 09:06
  • Thanks @misha. I din't realize that I was executing the queries sequentially. And Yes I meant flatMap. This is very insightful. I believe your solution of doing .collect().stream() before .map(Completable::join) has the same effect as using CompletableFuture::allOf. Also one more question about your suggestion of passing Executor to supplyAsyc to avoid polluting the common pool, is there a recommend way on how to do this? http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream, suggests creating a new fork pool and passing it along to supply async – premprakash Oct 27 '15 at 19:00
  • Yes, it will have the same effect as `allOf`, but it's shorter and is good enough because you are joining right there in the current thread. `allOf` would let you queue up further computations without joining, but you don't need that. As for which `Executor` to pass to `supplyAsync`, `java.util.concurrent.Executors` has factory methods to construct various Executors. If you have questions about choosing the most appropriate `Executor`, submit a new question. – Misha Oct 27 '15 at 19:12
  • Thanks @misha, I posted a new question here http://stackoverflow.com/questions/33377177/how-to-chose-an-executor-for-completablefuturesupplyasync – premprakash Oct 27 '15 at 19:49
  • @Misha .. isnt this blocking one by one.. So we will get Sum of all latencies rather than Max(Latencies) – so-random-dude Apr 08 '20 at 04:26