3

I am calling one async method inside for loop and adding future of it into a list. I am not sure why allOff is not waiting to complete all futures and returning partial result. Have a look of my code.

I have one overridden method

@Overide
@Async
CompletableFuture<someType> fetchData()
{
returns new CompletableFuture<someType>();
}

I am calling above method in a for loop with different instances.

get all beans which implments one interface which has mehod fetchData.

Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);

List<SomeClass> list=
    allBeans.values().stream().collect(SomeClass.toList());

for (SomeClass classInstance: list) {
  classInstance.fetchData().thenApply(x->{
    //Some DB persistence Call
    futureList.add(x);
  });
}

after that I am applying allOff so that all future can be completed but it is not waiting for all and main thread excuting rest of flow.

CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
      futureList.toArray(new CompletableFuture[futureList.size()]));

  CompletableFuture<List<futureResponse>> finalList=
      combinedFutures.thenApply(v -> {
        return futureList.stream().map(m -> 
        m.join()).collect(Collectors.toList());
       });

finalList- in this List I want all the completed futures returned by fetch invocation.

In finalList I am always getting 2 objects but fetchData is getting run 5 times( based on number of instances), I saw the log after all of remaining async call are getting completed. Could someone help here.

Observation:- After putting main thread on sleep for 30 sec, I could see I have all 5 objects in the list. Could some one please tell why main thread is not waiting at allOff for all futures to complete.

Deepesh Rathore
  • 281
  • 2
  • 4
  • 13
  • @daniu :- There could be any number fetchData invocation based on the classes extending fetchData interface. For now I have 5 invocations, in future it might increase. – Deepesh Rathore Oct 14 '19 at 13:37
  • What is `futureList`? In the first part of the question, it seems you are asynchronously adding `someType` instances into it (in the `thenApply`, which doesn't seem thread-safe except if `futureList` is some sort of concurrent collection) whereas in the second part, it seems to be a `List>`. – Didier L Oct 14 '19 at 15:43
  • Possible duplicate of [CompletableFuture is not getting executed. If I use the ExecutorService pool its work as expected but not with the default forkJoin common pool](https://stackoverflow.com/questions/51879659/completablefuture-is-not-getting-executed-if-i-use-the-executorservice-pool-its) – Didier L Oct 14 '19 at 15:51
  • @DidierL - futureList , I am collecting all the objects returned bu fetch invocation in a list so that after all async call gets over at last I should be haivng a List of object. I want all fetch invocation non blocking that why I have not joined it. – Deepesh Rathore Oct 14 '19 at 15:56
  • @DeepeshRathore curious about why you state that you are looking for a non blocking solution, and accept one that blocks the common fork-join pool ? Could you explain why ? – GPI Oct 22 '19 at 14:49
  • @GPI:- So my idea was is to give several db calls in parallel and once all completable futures returns me the result , I should be able to have result in a list and perform further logic. Not sure if I have made it complicated. Non blocking means I want each async call should returns a completable future and I can use allOff to wait for them to complete. Hope I am able to clarify what I am trying to do. So you mean to say thenAccept or thenApply will block the thread ? Please let me know. – Deepesh Rathore Oct 22 '19 at 19:01

2 Answers2

0

You have a race condition :

classInstance.fetchData().thenApply(x->{
    futureList.add(x);
});

This code means that only when the future x is completed, then x will be added to futureList. This might be in 10 milliseconds, or 2 hours, who knows ? (It might be never if the future fails exceptionnaly).

So, when the code reaches

CompletableFuture.allOf(futureList....

There is no "guarantee" that the thenApply have been called. futureList could even be empty.

One way you could correct this code is like so :

Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);


List<SomeClass> list=
    allBeans.values().stream().collect(SomeClass.toList());

 for (SomeClass classInstance: list) {
    futureList.add(classInstance.fetchData());
 }

Or if you actually need to do something in a thenApply:

 for (SomeClass classInstance: list) {
    futureList.add(
        classInstance.fetchData().thenApply(x -> whatever(x))
    );
 }

This way, your futureList is populated not when an async result returns (which is unknown, and might even fail with an exception an never occur), but as soon as the async call is created, which is what you actually want.

GPI
  • 9,088
  • 2
  • 31
  • 38
  • Why do the second loop at all? I'd do `allBeans.values().stream().map(SomeClass::fetchData).collect(toList())`, which directly gives you the list of `CompletableFuture`s. – daniu Oct 14 '19 at 13:18
  • @daniu you are probably right, but I am under the impression that the thenApply is necessary, or that `futureList` is a variable that hangs around (we never see it declared in the question), so I went with the minimum structural modification to the question code. – GPI Oct 14 '19 at 13:26
  • @GPI How can I stop all async process to stop their execution if any of the fetchData Invocation throws exception? – Deepesh Rathore Oct 15 '19 at 07:18
  • @DeepeshRathore this is a different question altogether. Have a look at https://stackoverflow.com/questions/33783561/completablefuture-aggregate-future-to-fail-fast or https://stackoverflow.com/questions/51621510/how-to-implement-completablefuture-allof-that-completes-exceptionally-once-any – GPI Oct 15 '19 at 09:02
0

IIUC, what you want to do can be done simpler by doing

CompletableFuture<List<FutureResponse>> = CompletableFuture.supplyAsync(() -> {
    // start fetches and collect the individual futures
    List<CompletableFuture> fetches = 
       allBeans.values().stream()
               .map(SomeClass::fetchData)
               .collect(toList());
    // join all futures and return the list of the results
    return fetches.stream()
               .map(CompletableFuture::join)
               .collect(toList());
}

I think you can't do it in a single stream (ie map to fetch, then immediately to join) because that might wait for the join before the next future is created.

daniu
  • 14,137
  • 4
  • 32
  • 53
  • This code is blocking as it is (the join will be performed by the calling thread). whereas in the question code, the join is intended to be performed once `combinedFutures` is done, which means that `join` calls would not block. As such, I think this answer performs differently than the original question intends it to work - bugs nonewithstanding. – GPI Oct 14 '19 at 13:35
  • @GPI no, the `join` will be performed asynchronously (namely in the `CompletableFuture` created by `supplyAsync`. I don't see how the `join`s blocking is an issue, you wait for the `fetch` completion either way. However, you're right this probably performs somewhat differently because the collector thread will also be run in the common thread pool. – daniu Oct 14 '19 at 13:40
  • Sorry, I did not register the `supplyAsync` line at all (my mind skipped it for some reason). The blocking might or might not be an issue, but it is a difference. The original question code is supposed to be totally non blocking, and here, we have a blocking call (not in the main thread as I erronously reported, but a blocking call nonetheless), just as you explained. – GPI Oct 14 '19 at 13:42