1

There are 3 services which are all REST calls - serviceOne, serviceTwo and serviceThree. serviceTwo processing is dependent on the output of serviceOne and serviceThree processing is dependent on the output of serviceTwo. However, serviceThree needs to run in a loop.

I want a way to get the final response after the serviceThreeRespCF completable future completes. Currently, the plansResponse get returned before the service3 finishes processing.

I tried using join() for serviceThreeRespCFbut it didn't help. Is there a better way to do this, i.e., without using get() or join()?

public CompletionStage<PlansResponse> process() {

    final PlansResponse plansResponse = new PlansResponse();
    final CompletableFuture<PlansResponse> plansResp = new CompletableFuture<>();

    final CompletionStage<ServiceOneResp> serviceOneRespCF = getResp1(); <A REST CALL>

    return serviceOneRespCF.thenCompose(serviceOneResp -> {

        // function to validate service one resp

        if (condition based on validation)
        {
            List<Plans> plans = serviceOneResp.getPlans();
            if (condition) {
               plansResponse.setSysMsgs());
            }
            else {

                final CompletionStage<ServiceTwoResp> serviceTwoRespCF = getResp2(); <A REST CALL>
                serviceTwoRespCF.thenCompose(serviceTwoResp -> {

                    plans.parallelStream().forEach(plan -> {

                        if (Optional.ofNullable(plan.getPlanId()).isPresent()) {

                            final CompletionStage<ServiceThreeResp> serviceThreeRespCF = getResp3();  <A REST CALL>

                            serviceThreeRespCF.thenAcceptAsync(serviceThreeResp -> {

                                Set<PlanDetail> planDetails = new HashSet<>();
                                planDetails = populatePlanDetails(plan, serviceTwoResp, serviceThreeResp, planDetails);
                                planDetailsList.addAll(planDetails.stream().distinct().collect(Collectors.toList()));
                                plansResponse.setPlanDetails(planDetailsList);
                                planDetails.clear();

                            });
                        }
                    });
                    return serviceThreeRespCF;
                });
            }
        }
        plansResp.complete(plansResponse);
        return plansResp;
    });
}

EDIT 1: Tried join (the return statement is continuation from the code above)

  1. directly on serviceThreeRespCF

     return serviceThreeRespCF;
                }).toCompletableFuture().join();
    
  2. using thenRunAsync

    return serviceThreeRespCF;
                }).thenRunAsync(() -> {
                    serviceThreeRespCF.toCompletableFuture().join();
                });
    

EDIT 2: I tried using allOf to get the values of service3 completion stages in a completable future. (this piece is after the service 2 thenCompose). As can be seen, the service2 response is not available to this completable future and it doesn't work. Besides this involves having to iterate through the list of plans 2 times. Is there another way to do this?

CompletableFuture<List<Service3Response>> service3ResponseList = allAsList(futures);
service3ResponseList.thenAccept(service3Responses -> {
    AtomicInteger i = new AtomicInteger(0);
    plans.parallelStream().forEach(plan -> {
        Set<PlanDetail> planDetails = new HashSet<>();
        planDetails = populatePlanDetails(acctSetFromRequest, plan, service3Responses.get(i.get()), service2Response <NOT AVAILABLE>, planDetails, acctMapFromResponse.get(plan.getPlanId()));
        planDetailsList.addAll(planDetails.stream().distinct().collect(Collectors.toList()));
        plansResponse.setDistributionPlanDetails(planDetailsList);
        planDetails.clear();
        i.addAndGet(1);
    });
});

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
            futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
  • How did you try to join? Did you try to use the method thenRunAsync(Runnable action) in CompletionStage? – A_C Nov 30 '18 at 11:45
  • I used .toCompletableFuture().join() after serviceTwoRespCF.thenCompose() call. Not sure if this is the right way to do it. As for the suggestion you made, I experimented with join in thenRunAsync() after the serviceTwoRespCF as well; same result. – Ashish Singh Nov 30 '18 at 12:34
  • How exactly did you apply the join? Could you paste the code? – A_C Nov 30 '18 at 12:57
  • Updated the question description. – Ashish Singh Nov 30 '18 at 13:18
  • If I understand correctly, you need to transform a Collection of Futures into a Future of Collections. Have a look at `CompletableFuture#allOf`. If you need the results of Service3 in the resulting Future, see https://stackoverflow.com/a/36261808/1225328. – sp00m Nov 30 '18 at 13:26
  • I used allOf to get the results of all completion stages of service3(added in edit 2), however, the service2 response is not available here. Moreover, it requires another iteration of the List. Is there a different way you would do this? – Ashish Singh Dec 03 '18 at 06:20

0 Answers0