There are 3 services which are all REST calls - serviceOne, serviceTwo and serviceThree. serviceTwo processing is dependent on the output of serviceOne and serviceThree processing is dependent on the output of serviceTwo. However, serviceThree needs to run in a loop.
I want a way to get the final response after the serviceThreeRespCF completable future completes. Currently, the plansResponse get returned before the service3 finishes processing.
I tried using join() for serviceThreeRespCFbut it didn't help. Is there a better way to do this, i.e., without using get() or join()?
public CompletionStage<PlansResponse> process() {
final PlansResponse plansResponse = new PlansResponse();
final CompletableFuture<PlansResponse> plansResp = new CompletableFuture<>();
final CompletionStage<ServiceOneResp> serviceOneRespCF = getResp1(); <A REST CALL>
return serviceOneRespCF.thenCompose(serviceOneResp -> {
// function to validate service one resp
if (condition based on validation)
{
List<Plans> plans = serviceOneResp.getPlans();
if (condition) {
plansResponse.setSysMsgs());
}
else {
final CompletionStage<ServiceTwoResp> serviceTwoRespCF = getResp2(); <A REST CALL>
serviceTwoRespCF.thenCompose(serviceTwoResp -> {
plans.parallelStream().forEach(plan -> {
if (Optional.ofNullable(plan.getPlanId()).isPresent()) {
final CompletionStage<ServiceThreeResp> serviceThreeRespCF = getResp3(); <A REST CALL>
serviceThreeRespCF.thenAcceptAsync(serviceThreeResp -> {
Set<PlanDetail> planDetails = new HashSet<>();
planDetails = populatePlanDetails(plan, serviceTwoResp, serviceThreeResp, planDetails);
planDetailsList.addAll(planDetails.stream().distinct().collect(Collectors.toList()));
plansResponse.setPlanDetails(planDetailsList);
planDetails.clear();
});
}
});
return serviceThreeRespCF;
});
}
}
plansResp.complete(plansResponse);
return plansResp;
});
}
EDIT 1: Tried join (the return statement is continuation from the code above)
directly on serviceThreeRespCF
return serviceThreeRespCF; }).toCompletableFuture().join();
using thenRunAsync
return serviceThreeRespCF; }).thenRunAsync(() -> { serviceThreeRespCF.toCompletableFuture().join(); });
EDIT 2: I tried using allOf to get the values of service3 completion stages in a completable future. (this piece is after the service 2 thenCompose). As can be seen, the service2 response is not available to this completable future and it doesn't work. Besides this involves having to iterate through the list of plans 2 times. Is there another way to do this?
CompletableFuture<List<Service3Response>> service3ResponseList = allAsList(futures);
service3ResponseList.thenAccept(service3Responses -> {
AtomicInteger i = new AtomicInteger(0);
plans.parallelStream().forEach(plan -> {
Set<PlanDetail> planDetails = new HashSet<>();
planDetails = populatePlanDetails(acctSetFromRequest, plan, service3Responses.get(i.get()), service2Response <NOT AVAILABLE>, planDetails, acctMapFromResponse.get(plan.getPlanId()));
planDetailsList.addAll(planDetails.stream().distinct().collect(Collectors.toList()));
plansResponse.setDistributionPlanDetails(planDetailsList);
planDetails.clear();
i.addAndGet(1);
});
});
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}