1

I have code like the following:

testMethod(List<String> ids)  {
    List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
    
    for(String id : ids) {
        CompletableFuture<ResultOne> resultOne = AynchOne(id);
        CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
        
    CompletableFuture<ResultThree> resultThree =  resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)); 
    
    resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,  ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult && r2.goodResult;

    return resultThree;
}

, I need to be able to AND the results resultOne and resultTwo together, such that for each iteration, on the completion of the entire synchronous execution, I have an (I guess) array or map that I can subsequently process, where one object in the array has the corresponding id and a true or false for that id (that represents the AND-ing of the two booleans from the separate objects.

Based on feedback from readers, I have gotten the code completed to the point where I can merge the two original futures, and combine all the results from each iteration to get the entire loop of futures. At this point I just need to process the results.

I think maybe I need another CompletableFuture? This one would maybe be something like this (put above where I have "// PROCESS RESULTS HERE"):

CompletableFuture<Void> future = resultThreeList
  .thenRun(() -> forwardSuccesses(resultThreeList));

future.get();

forwardSuccesses() would iterate through resultThreeList forwarding the successful ids to another process, but not sue that is how to do it. Grateful for any ideas. Thanks.

Timothy Clotworthy
  • 1,960
  • 2
  • 19
  • 42
  • 1
    You'll want `thenCombine` to combine each `resultOne` and `resultTwo`, and then `allOf` to combine all the resulting futures. – Louis Wasserman Sep 30 '20 at 18:08
  • @LouisWasserman , I appreciate the response. I get the basic idea that you can combine the result using thenCombine. I am trying: resultOne.thenCombine(resultTwo); which is just wrong. I am not following how to actualy implement. thanks again. – Timothy Clotworthy Sep 30 '20 at 18:32
  • Yeah create a third future in the loop, the store those in your resulting list. – daniu Sep 30 '20 at 18:32
  • You're looking for `resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b))`. Implement `computeCombinedResultThree` appropriately. – Louis Wasserman Sep 30 '20 at 18:35
  • @LouisWasserman , thanks. Maybe closer? I now have a private void computeCombinedResultThree(ResultOne r1, ResultTwo r2) { //do stuff here } but still not compiling because getting "Cannot infer functional interface type" from the above: resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)) – Timothy Clotworthy Sep 30 '20 at 19:28
  • The return type shouldn't be void. It should be a new `ResultThree` class containing an ID and a boolean value. (Or you could reuse `ResultOne` or `ResultTwo`.) – Louis Wasserman Sep 30 '20 at 19:33
  • @LouisWasserman thanks. I have been updating my original post as I get your helpful advice for how to accomplish original task. I believe I you would have to look at updated post above) I now have the proper merge result in Result3? However, I still am uncertain how to perform the "allOf" based on the above. thanks again so far! – Timothy Clotworthy Sep 30 '20 at 20:20
  • If you just want to return the final list of completed futures, you probably don't need `allOf`. Just create a `List>`, and then call `get()` on each one in a loop. – Louis Wasserman Sep 30 '20 at 20:28
  • @LouisWasserman thanks, but I don't completely understand. I think you saying, first create the list (I guess like List> resultThreeList = new ArrayList<>();)?. And then I am confused how I add the ResultThree futures into my code. I don't have much experience with lamdas. This line: resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)), was for combing one and two into the ResultThree, but I don't understand how to add that result into a list as you mention. Could you possibly answer in context of my code above? – Timothy Clotworthy Sep 30 '20 at 21:56
  • `CompletableFuture resultThree = resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)); resultThreeList.add(resultThree);` – Louis Wasserman Sep 30 '20 at 22:21
  • @LouisWasserman ok I understand now. The only thing I have left to do is some additional processing against the resultThree list of the completed futures. Can I do that directly in the testMethod of my example (say where I have the // PROCESS RESULTS HERE above in example? It would essentially be a loop against each object in the list that would assume the futures in the list are completed. Or am I overlooking something? Do I need to implement some listener that recognizes the futures are complete and do it there? This would be the last step. thanks again. – Timothy Clotworthy Sep 30 '20 at 23:40
  • If you're not planning to return a `CompletableFuture`, then just call `.get()`, which will wait for the future to complete. – Louis Wasserman Oct 01 '20 at 03:35
  • @LouisWasserman do you still want to post this as a reply? Because I'd like to, but not steal your points since you're so far into explaining in the comments. – daniu Oct 01 '20 at 06:21
  • @LouisWasserman thanks. I have attempted to implement something kind of like what I think you are suggesting, but I must be doing it wrong (its in the updated post). I need to take the resultThreeList and process the results at completion of all futures in the list.Above in post now see: CompletableFuture future = resultThreeList.thenRun(() -> forwardSuccesses(resultThreeList)); , which is followed by get(). However what I have must not make sense as its not compiling. I just want to call a method called forwardSuccesses() that processes the list when futures are completed. – Timothy Clotworthy Oct 01 '20 at 12:33
  • @daniu I am stuck here. If you have another approach I would certainly appreciate your input. thank you. – Timothy Clotworthy Oct 01 '20 at 17:07
  • What is `forwardSuccesses`? At this point, it should just be `List resultThrees = new ArrayList<>(); for (CompletableFuture resultThreeFuture : resultThreeList) { resultThrees.add(resultThreeFuture.get()); }`, and then you don't have to worry about `CompletableFuture` anything at all anymore. – Louis Wasserman Oct 01 '20 at 18:40

3 Answers3

0

So this is how far you got until now:

List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
    CompletableFuture<ResultOne> resultOne = aynchOne(id);
    CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);

    CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

Now all you need to do is convert this List<CompletableFuture<ResultThree>> to a CompletableFuture<List<ResultThree>> that will get completed once all the results are finished calculating.

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

Or with something like

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));

where safeGet is a method that just calls future.get() and catches the exceptions that may occur - you can't just call get() in a lambda because of those exceptions.

Now you can process this list with thenAccept():

try {
    combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

Again, the exceptions being caught are due to the call to get().

Side note, I don't really see why there are three result classes since all you need - for this part of the code at least - is the id and the result status. I'd introduce an interface (Result?) for that and only work on that.

daniu
  • 14,137
  • 4
  • 32
  • 53
0

It seems to me you don't need 3 different ResultOne ResultTwo ResultThree classes as they define the same type, so I shall replace them for Result.

Assuming you want to forward only successes, I added a short isGoodResult() method to the Result class to be used as predicate with the streams:

class Result {
    public boolean goodResult;
    public String id;
// ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

I'd also recommend getting rid of the loop, replacing it for a stream to make your code more fluid.

Should forwardSuccess be strict, accepting List<Result>, this is how I'd implement testMethod:

void testMethod(List<String> ids)  {
    final List<Result> results = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // PROCESS RESULTS HERE
    forwardSuccesses(results);
}

Should forwardSuccess be lazy, accepting CompletableFuture<List<Result>>:

void testMethod(List<String> ids)  {
    final List<CompletableFuture<Result>> futures = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .collect(Collectors.toList());

    final CompletableFuture<List<Result>> asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -> futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // PROCESS RESULTS HERE
    forwardSuccessesAsync(asyncResults);
}
dbaltor
  • 2,737
  • 3
  • 24
  • 36
  • Iiuc, both `CompletableFuture` and `parallelStream()` work on the same (standard ForkJoinPool) by default, so using a stream this way should halve the threads available for parallelism. – daniu Oct 04 '20 at 19:04
  • Good point. However we don't know how `aynchOne` and `aynchTwo` have been implemented, if they are really using the common ForkJoinPool. We don't know whether are they invoking blocking APIs. If so, I'd recommend those methods to allocate separate executors. If they are not using blocking APIs, I'd prefer to keep the total number of threads equal to the cores. – dbaltor Oct 06 '20 at 18:49
-2

Within the for loop you get immediately the CompletableFutures in return. In the background some magic happens and you want to wait until both are complete.

So after both CompletableFutures were returned, cause a blocking wait by invoking CompletableFuture.get with a long value and a time unit. If you only invoke get without any parameters you'll wait forever.

Choose your timeout and JDK wisely. It might happen, that JDK 8 doesn't provide a get with timeout. Also JDK 8 isn't supported anymore. JDK 11 is now long term support and recent compilers don't offer JDK 8 as a target anymore.

I really urge you to read the dirty details about CompletableFuture and how it differs from Future, esp. regarding thread control like cancellation. Not knowing the underlying provider of CompletableFuture I also assume querying one ID is waste of ressources and throughput is very limited. But this is a separate question.

motzmann
  • 151
  • 6
  • I appreciate the response but I see plenty of people using CompletableFuture . thanks anyway. – Timothy Clotworthy Sep 30 '20 at 18:19
  • Yes, and they expect CompletableFuture.cancel to also cancel the underlying job but this is not the case. I only wanted to point out that there are differences between Future-s. And receiving a CompletableFuture alone doesn't explain anything about the underlying mechanisms if this is at all bound to threads. Knowledge makes everything easier. – motzmann Sep 30 '20 at 19:03