20

I don't understand how to use AsyncRestTemplate effectively for making external service calls. For the code below:

class Foo {

    public void doStuff() {
        Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(
                url1, String.class);
        String response1 = future1.get();

        Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
                url2, String.class);
        String response2 = future2.get();

        Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(
                url3, String.class);
        String response3 = future3.get();
    }
}

Ideally I want to execute all 3 calls simultaneously and process the results once they're all done. However each external service call is not fetched until get() is called but get() is blocked. So doesn't that defeat the purpose of AsyncRestTemplate? I might as well use RestTemplate.

So I don't understaand how I can get them to execute simultaneously?

Glide
  • 20,235
  • 26
  • 86
  • 135
  • 1
    please see the example in this [link](http://www.concretepage.com/spring-4/spring-4-asyncresttemplate-listenablefuture-example), if that would help – vineeth sivan Feb 05 '16 at 04:27
  • @vineethsivan the link doesn’t answer my question. The link only shows how `future.get()` is called. – Glide Feb 07 '16 at 22:52
  • 1
    It blows my mind that not a single answer properly converts to a non-blocking CompletableFuture in a non-blocking way (sleeps and get are inappropriate). I'm not answering because I can't take credit for merely pointing to an appropriate solution. https://dzone.com/articles/converting-listenablefutures – Richard Collette Aug 11 '18 at 18:12

6 Answers6

17

Simply don't call blocking get() before dispatching all of your asynchronous calls:

class Foo {
  public void doStuff() {
    ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate
        .getForEntity(url1, String.class);
    ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate
        .getForEntity(url2, String.class);
    ListenableFuture<ResponseEntity<String>> future3 = asyncRestTemplate
        .getForEntity(url3, String.class);

    String response1 = future1.get();
    String response2 = future2.get();
    String response3 = future3.get();
  }
}

You can do both dispatch and get in loops, but note that current results gathering is inefficient as it would get stuck on the next unfinished future.

You could add all the futures to a collection, and iterate through it testing each future for non blocking isDone(). When that call returns true, you can then call get().

This way your en masse results gathering will be optimised rather than waiting on the next slow future result in the order of calling get()s.

Better still you can register callbacks (runtimes) within each ListenableFuture returned by AccyncRestTemplate and you don't have to worry about cyclically inspecting the potential results.

diginoise
  • 7,352
  • 2
  • 31
  • 39
  • Can you confirm when getForEntity() is called, it will start the connection? I can see my web traffic via Charles Proxy and I do not see the connections made until future.get() is called. – Glide Feb 20 '16 at 01:53
  • While the three calls are going to be made, the calling thread is still going to block (not return to a thread pool if the current thread is from something like an http request thread pool). That will impact the ability to scale an application, if scalability is a concern. – Richard Collette Aug 11 '18 at 17:59
  • @RichardCollette the REST calls via `asyncRestTemplate` are not blocking... the `Future.get()` calls are when the future has not finished. – diginoise Aug 13 '18 at 08:57
  • `.get()` is a blocking call. – Richard Collette Aug 13 '18 at 20:41
  • @RichardCollette sure `get()` is... it says it in the first line of the answer – diginoise Aug 14 '18 at 08:08
8

If you don't have to use 'AsyncRestTemplate' I would suggest to use RxJava instead. RxJava zip operator is what you are looking for. Check code below:

private rx.Observable<String> externalCall(String url, int delayMilliseconds) {
    return rx.Observable.create(
            subscriber -> {
                try {
                    Thread.sleep(delayMilliseconds); //simulate long operation
                    subscriber.onNext("response(" + url + ") ");
                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }
    );
}

public void callServices() {
    rx.Observable<String> call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread());
    rx.Observable<String> call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread());
    rx.Observable<String> call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread());
    rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3)
            .subscribeOn(Schedulers.newThread())
            .subscribe(response -> System.out.println("done with: " + response));
}

All requests to external services will be executed in separate threads, when last call will be finished transformation function( in example simple string concatenation) will be applied and result (concatenated string) will be emmited from 'zip' observable.

lkz
  • 116
  • 2
6

What I Understand by Your question is You have a predefined asynchronous method and you try to do is call this method asynchoronously using RestTemplate Class.

I have wrote a method that will help you out to call Your method asynchoronously.

 public void testMyAsynchronousMethod(String... args) throws Exception {
        // Start the clock
        long start = System.currentTimeMillis();

        // Kick of multiple, asynchronous lookups
        Future<String> future1 = asyncRestTemplate
        .getForEntity(url1, String.class);;
        Future<String> future2 = asyncRestTemplate
        .getForEntity(url2, String.class);
        Future<String> future3 = asyncRestTemplate
        .getForEntity(url3, String.class);

        // Wait until they are all done
        while (!(future1 .isDone() && future2.isDone() && future3.isDone())) {
            Thread.sleep(10); //10-millisecond pause between each check
        }

        // Print results, including elapsed time
        System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
    }
Vikrant Kashyap
  • 6,398
  • 3
  • 32
  • 52
1

You might want to use CompletableFuture class (javadoc).

  1. Transform your calls into CompletableFuture. For instance.

    final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    });
    
  2. Next call CompletableFuture::allOf method with your 3 newly created completable futures.

  3. Call join() method on the result. After the resulting completable future is resolved you can get the results from each separate completable future you've created on step 3.

Kuvaldis
  • 348
  • 1
  • 9
-1

I think you are misunderstanding a few things here. When you call the getForEntity method, the requests are already fired. When the get() method of the future object is called, you are just waiting for the request to complete. So in order fire all those three requests on the same subsecond, you just have to do:

// Each of the lines below will fire an http request when it's executed
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(url1, String.class);
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(url2, String.class);
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(url3, String.class);

After all these codes are run, all the requests are already fired (most probably in the same subsecond). Then you can do whatever you want in the meanwhile. As soon as you call any of the get() method, you are waiting for each request to complete. If they are already completed, then it will just return immediately.

// do whatever you want in the meantime
// get the response of the http call and wait if it's not completed
String response1 = future1.get();
String response2 = future2.get();
String response3 = future3.get();
Rowanto
  • 2,819
  • 3
  • 18
  • 26
  • Can anyone confirm when `getForEntity()` is called, the requests are already fired? I can see my web traffic via Charles Proxy and I do not see the connections made until `future.get()` is called. – Glide Feb 18 '16 at 19:18
  • 1
    I have no idea why it is not working for you, but check out http://javattitude.com/2014/04/20/using-spring-4-asyncresttemplate/ they have the tutorial and a test for the stuff. So it should be working. – Rowanto Feb 25 '16 at 08:39
  • Yes, it should work. getForEntity calling execute inside. – Alex78191 Jul 02 '20 at 22:16
-2

I don't think any of the previous answers actually achieve parallelism. The problem with @diginoise response is that it doesn't actually achieve parallelism. As soon as we call get, we're blocked. Consider that the calls are really slow such that future1 takes 3 seconds to complete, future2 2 seconds and future3 3 seconds again. With 3 get calls one after another, we end up waiting 3 + 2 + 3 = 8 seconds. @Vikrant Kashyap answer blocks as well on while (!(future1 .isDone() && future2.isDone() && future3.isDone())). Besides the while loop is a pretty ugly looking piece of code for 3 futures, what if you have more? @lkz answer uses a different technology than you asked for, and even then, I'm not sure if zip is going to do the job. From Observable Javadoc:

zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by each of the source Observables; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by each of those Observables; and so forth.

Due to Spring's widespread popularity, they try very hard to maintain backward compatibility and in doing so, sometimes make compromises with the API. AsyncRestTemplate methods returning ListenableFuture is one such case. If they committed to Java 8+, CompletableFuture could be used instead. Why? Since we won't be dealing with thread pools directly, we don't have a good way to know when all the ListenableFutures have completed. CompletableFuture has an allOf method that creates a new CompletableFuture that is completed when all of the given CompletableFutures complete. Since we don't have that in ListenableFuture, we will have to improvise. I've not compiled the following code but it should be clear what I'm trying to do. I'm using Java 8 because it's end of 2016.

// Lombok FTW
@RequiredArgsConstructor
public final class CounterCallback implements ListenableFutureCallback<ResponseEntity<String>> {
  private final LongAdder adder;

  public void onFailure(Throwable ex) {
    adder.increment();
  }
  public void onSuccess(ResponseEntity<String> result) {
    adder.increment();
  }
}

ListenableFuture<ResponseEntity<String>> f1 = asyncRestTemplate
        .getForEntity(url1, String.class);
f1.addCallback(//);
// more futures

LongAdder adder = new LongAdder();
ListenableFutureCallback<ResponseEntity<String>> callback = new CounterCallback(adder);
Stream.of(f1, f2, f3)
  .forEach {f -> f.addCallback(callback)}

for (int counter = 1; adder.sum() < 3 && counter < 10; counter++) {
  Thread.sleep(1000);
}
// either all futures are done or we're done waiting
Map<Boolean, ResponseEntity<String>> futures = Stream.of(f1, f2, f3)
  .collect(Collectors.partitioningBy(Future::isDone));

Now we've a Map for which futures.get(Boolean.TRUE) will give us all the futures that have completed and futures.get(Boolean.FALSE) will give us the ones that didn't. We will want to cancel the ones that didn't complete.

This code does a few things that are important with parallel programming:

  1. It doesn't block.
  2. It limits the operation to some maximum allowed time.
  3. It clearly separates successful and failure cases.
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • 1
    I don't think @diginoise's solution will cost "3 + 2 + 3 = 8 seconds" . All http requests are sent out when `getForEntity` is called, so getting real response for `future1`, `future2`, `future3` will cost max of three requests, it will be 3 seconds in this case (max(3,2,3)) – simomo Dec 01 '17 at 10:04
  • @simomo `Future`s are not resolved until `get` is called. It's async 101. – Abhijit Sarkar Dec 04 '17 at 17:12
  • Hi @Abhijit , the http requests will be submitted to a `taskExecutor` when `getForEntity` is called, and then taskExecutor fired the http request (immediately in most cases). Here is the [code](https://github.com/spring-projects/spring-framework/blob/a5b94f3a776c16ce3eb09ac92a9a7907910f5ff5/spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java#L79) . So my point is sending http requests is a async process of calling `getForEntity`, but it definitely has no business with calling `get`. – simomo Dec 05 '17 at 03:39
  • 1
    And " `Futures` are not resolved until `get` is called " is true, but http request is sent ***before*** `Futures` are resolved. – simomo Dec 05 '17 at 03:47
  • my answer **does indeed** mention how to streamline and not block during the results gathering by using non-blocking calls OR even better to use callbacks, which is what you did. OP's problem was dispatching and waiting for result per call, which is what I had focused my answer on. – diginoise Feb 28 '18 at 10:43
  • ... also you do actively spin waiting (`for() {Thread.sleep()}`) as well, which you had criticized. Final problem is that partitioning on `Future::isDone` makes no sense, as this partitioning is done once, while the futures can finalise computation after the partitioning. – diginoise Feb 28 '18 at 10:52