6

I am using the Async Http Client library (with Netty) to make asynchronous Http Get requests to a RESTful API. Since I want to preserve the non-blocking behavior I am returning instances of CompletableFuture<T> as the result of the Http Get requests. So, where a RESTful API endpoints returns a Json array, I am returning a CompletableFuture<T[]>.

Yet, and according to the classification made by Erik Meijer about the The Four Essential Effects In Programming, I consider that Stream<T> is better suited to the result of a Java method that makes an asynchronous Http Get request and returns a Json array. In this case we can see the Stream<T> as the Observable<T> equivalent, which is the result of an asynchronous computation that returns many values.

So, considering that resp holds the response, then I can get a CompletableFuture<Stream<T>> as following:

 CompletableFuture<T[]> resp = …
 return resp.thenApply(Arrays::stream);

However, I was wondering how can I convert the CompletableFuture<Stream<T>> resp to a Stream<T>, without waiting for the computation to complete (i.e I do NOT want to block on get() invocation)?

I would like to have the same result as the following expression, but WITHOUT blocking on get():

return resp.thenApply(Arrays::stream).get();
rodolfino
  • 639
  • 1
  • 5
  • 8

2 Answers2

6

You can build a Stream<T> that will defer the call to the Future<T> get() method, just like this:

CompletableFuture<T[]> resp = ...
return Stream
        .of(resp)                               // Stream<CompletableFuture<T[]>>
        .flatMap(f -> Arrays.stream(f.join())); // Stream<T>

To simplify usage, instead of get() I am using join() to avoid checked exceptions.

Miguel Gamboa
  • 8,855
  • 7
  • 47
  • 94
  • 2
    That doesn’t change the fact, that the stream operation will be blocked until the entire completion of the `CompletableFuture`. It just delays it a bit—at a high price. – Holger May 23 '16 at 15:00
  • 3
    Off course. But only when traversed. In OP the author is not traversing the `[]T`either. – Miguel Gamboa May 23 '16 at 15:18
  • 2
    Indeed. I only wanted to emphasize the limitations (as I’m not sure whether the OP is aware of them). I added an answer going more into detail about them. – Holger May 23 '16 at 15:34
4

As long as the result of the asynchronous computation is passed as an array, you can’t benefit from the Stream API here, as the Stream operation can’t start processing elements until the array has been handed off, which implies the entire completion of the asynchronous job.

Unless you rewrite your asynchronous job to publish individual elements of the arrays, e.g. via queue, you can only defer the synchronization up to the point when the terminal operation of the stream has commenced. In other words, you can chain the intermediate operations to the Stream before you have to wait for the completion of the asynchronous job. Since chaining is not an expensive operation, the gain will be very small.

If you still want to do it, Miguel Gamboa’s solution, Stream.of(resp).flatMap(f -> Arrays.stream(f.join())) will do and it’s concise. Unfortunately, it may have performance drawbacks which outweigh any benefit of deferring the join operation. Streaming over arrays works smoothly as arrays have a predictable length and support for balanced splitting, whereas the nested stream not only lacks these features, the current implementation even lacks short-circuiting processing.

So instead of utilizing flatMap for deferring the Stream creation, it’s more recommended to go one level deeper, where deferred Stream creation is directly supported:

static <T> Stream<T> getStream(CompletableFuture<T[]> resp) {
    return StreamSupport.stream(() -> Arrays.spliterator(resp.join()),
        Spliterator.ORDERED|Spliterator.SIZED|Spliterator.SUBSIZED|Spliterator.IMMUTABLE,
        false);
}

This creates a Stream which defers the join operation until the terminal operation starts, but still has the performance characteristics of an array-based Stream. But the code obviously is more complicated and the benefit still is, as said, only the possibility to chain intermediate operations while the array providing asynchronous operation is still running.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • this looks so obvious when you read the answer, but so complicated to think of. thank you yet again for saving my day. – Eugene Jan 11 '21 at 04:13