10

I have to make Rest API invocation using RestTemplate multiple time with different parameters. API is same but it is the parameter that is getting changed. Number of times is also variable. I want to use AsyncRestTemplate but my main Thread should wait until all API calls have been successfully completed. I also want to work with responses that each API call returned. Currently I am using RestTemplate. In basic form it is as following.

List<String> listOfResponses = new ArrayList<String>();
for (Integer studentId : studentIdsList) {
    String respBody;
    try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
    respBody = requestEntity.getBody();
    listOfResponses.add(respBody);          
}

How can I implement AsyncRestTemplate in this situation?

Didier L
  • 18,905
  • 10
  • 61
  • 103
TV Nath
  • 499
  • 5
  • 12
  • 35
  • Note that the code you provide in the question does not compile and does not seem to be correct (`studentId` is unused, `respBody` = request body). A good question should include an [mcve] as well as what you have tried. – Didier L Jun 12 '17 at 09:01
  • 1
    May be I'm missing something here. Have you thought about updating your web service to accept collection of ids as a argument ? – s7vr Jun 12 '17 at 12:39
  • The web service I consume is out of my scope – TV Nath Jun 13 '17 at 02:39

3 Answers3

12

The main idea when using AsyncRestTemplate (or any asynchronous API, in fact), is to send all you requests in a first time, keeping the corresponding futures, then process all responses in a second time. You can simply do this with 2 loops:

List<ListenableFuture<ResponseEntity<String>>> responseFutures = new ArrayList<>();
for (Integer studentId : studentIdsList) {
    // FIXME studentId is not used
    ListenableFuture<ResponseEntity<String>> responseEntityFuture = restTemplate.exchange(url, method, requestEntity, String.class);
    responseFutures.add(responseEntityFuture);
}
// now all requests were send, so we can process the responses
List<String> listOfResponses = new ArrayList<>();
for (ListenableFuture<ResponseEntity<String>> future: responseFutures) {
    try {
        String respBody = future.get().getBody();
        listOfResponses.add(respBody);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
}

Note: if you need to pair the responses with the original requests, you can replace the list of futures with a map or a list of request+response objects.

I also noted that studentId is not used in your question.

Didier L
  • 18,905
  • 10
  • 61
  • 103
  • but how to make sure all requests have been processed? when get() called on each future, may be request is still being processed? – TV Nath Jun 12 '17 at 09:52
  • 2
    `get()` is blocking, it will only return after the request completes – while the other requests are still being processed as well. So if the second loop completes normally, all requests will have been processed. – Didier L Jun 12 '17 at 09:56
  • is there a way to do this in unblocking way? – TV Nath Jun 13 '17 at 02:39
  • 2
    Your question said “_my main Thread should wait until all API calls have been successfully completed_” so this basically means blocking on each until it is completed. The important point is that the requests are performed in parallel so the time it will take is roughly the duration of the slowest request. – Didier L Jun 13 '17 at 08:48
6

You could use Java 8 Stream API, if that's feasible for you:

List<String> listOfResponses = studentIdsList.stream()
    .parrallel()
    .map({studentId ->
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, studentId, String.class);
        return responseEntity.getBody();
    })
    .collect(Collectors.toList());

This code will basically perform 2 things:

  1. Performs requests in parallel;
  2. Collect the results of the requests into a List.

UPDATE: Agree with @Didier L - this solution may not work properly when you need to do a lot of requests. Here is an updated version:

List<String> listOfResponses  = studentIdsList.stream()
                .map(studentId -> asyncRestTemplate.exchange(url, method, studentId, String.class)
                .collect(Collectors.toList()).stream()
                .map(this::retrieveResult)
                .collect(Collectors.toList());

    /**
     * Retrieves results of each request by blocking the main thread. Note that the actual request was performed on the previous step when
     * calling asyncRestTemplate.exchange(url, method, studentId, String.class)
     */
    private String retrieveResult(ListenableFuture<ResponseEntity<String>> listenableFuture) {
        try {
            return listenableFuture.get().getBody();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
Danylo Zatorsky
  • 5,856
  • 2
  • 25
  • 49
  • 1
    By default, parallel streams are configured for CPU-intensive tasks. If you use a parallel stream for I/O intensive tasks, you'd have to configure a custom fork-join pool to run it. However this won't scale very well compared to an asynchronous API, since your I/O tasks will consume many threads. – Didier L Jun 12 '17 at 12:56
  • @DidierL thanks for pointing this out. I have updated my answer. – Danylo Zatorsky Jun 12 '17 at 20:44
  • Unfortunately your second solution wouldn't work as expected because sequential stream process elements one by one: this will create each future and immediately call `get()` on it, before creating the next one. You should collect all created futures first before trying to retrieve their results. – Didier L Jun 13 '17 at 09:50
  • @DidierL That's exactly what it does. First 'map' operation initiates requests and collects all the futures. The second 'map' operation retrieves results from these futures one-by-one. You can think of asyncRestTemplate.exchange(..) as of Thread Pool which submits a task for a request to the external resource and returns a Future as its result. BTW, just checked it on my machine and it works just fine. – Danylo Zatorsky Jun 13 '17 at 18:42
  • 2
    It will work, but it is sequential, not parallel. The first `map()` will not collect all the futures – you'd need a `collect()` call for that. Instead, the first request will be sent (first `map()`), then wait for the response (second `map()`), then second request will be sent (first `map()`) etc. See [How to properly submit and get several Futures in the same Java stream?](https://stackoverflow.com/questions/44298581/how-to-properly-submit-and-get-several-futures-in-the-same-java-stream). – Didier L Jun 13 '17 at 20:05
  • Yeah, you are right again. I thought collection on the last step would trigger a chain of operations one be one (map for all elements, then again map for all elements). But now I realise that it wold be silly if to speak about performance :) Thanks for your comment, I updated my answer. – Danylo Zatorsky Jun 13 '17 at 20:25
2

Here is another solution I would like to suggest which uses Spring's RestTemplate rather than AsyncRestTemplate. It is also using Java 8 CompletableFuture.

public void sendRequestsAsync(List<Integer> studentList) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(studentList.size()); //List to hold all the completable futures
    List<String> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Integer studentId : studentList) { //Iterate student list
        CompletableFuture<Void> requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> restTemplate.exchange("URL/" + studentId, HttpMethod.GET, null, String.class),
                        yourOwnExecutor
                )//Supply the task you wanna run, in your case http request
                .thenApply((responseEntity) -> {
                    responses.add(responseEntity.getBody());
                    return responseEntity;
                })//now you can add response body to responses
                .thenAccept((responseEntity) -> {
                    doSomeFinalStuffWithResponse(responseEntity);
                })//here you can do more stuff with responseEntity (if you need to)
                .exceptionally(ex -> {
                    System.out.println(ex);
                    return null;
                });//do something here if an exception occurs in the execution;

        completableFutures.add(requestCompletableFuture);
    }

    try {
        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).get(); //Now block till all of them are executed by building another completablefuture with others.
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

I like this solution more because I can chain as much business logic I want and don't have to depend on Spring's internals for Async Sending. Obviously you can clean up the code more, I haven't paid much attention into that for now.

Sneh
  • 3,527
  • 2
  • 19
  • 37
  • In fact you can easily [convert `ListenableFuture` into a `CompletableFuture` and back](https://dzone.com/articles/converting-listenablefutures). Also the `AsyncRestTemplate` is part of the public API as much as `RestTemplate`, it is not a "Spring internal". If you want to do asynchronous stuff it is probably better to rely on what is provided by the API as much as possible. – Didier L Jun 13 '17 at 11:50
  • Yeah I agree my bad on saying its spring internal. What I was trying to convey was we can get it done via Java's own api rather for async stuff rather than relying on async template and listenablefuture. – Sneh Jun 13 '17 at 11:52