Consider the following code under these condition:
getOneResponePage(int)
produces aFlux<Integer>
. It's simulating a request to fetch a page of results from another service. It's implementation should be can be treated as black box for my questions. (See below for an explanation of it's actual purpose.)
getOneResponePage(int)
will eventually return an emptyFlux<Integer>
, which is it's way to signal that no more results will come. (But it will keep on emiting emptyFlux<Integer>
.)
package ch.cimnine.test;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class PaginationTest {
@Test
public void main() {
final Flux<Integer> finalFlux = getAllResponses();
finalFlux.subscribe(resultItem -> {
try {
Thread.sleep(200); // Simulate heavy processing
} catch (InterruptedException ignore) {
}
System.out.println(resultItem);
});
}
private Flux<Integer> getAllResponses() {
Flux<Flux<Integer>> myFlux = Flux.generate(
() -> 0, // inital page
(page, sink) -> {
var innerFlux = getOneResponePage(page); // eventually returns a Flux.empty()
// my way to check whether the `innerFlux` is now empty
innerFlux.hasElements().subscribe(
hasElements -> {
if (hasElements) {
System.out.println("hasElements=true");
sink.next(innerFlux);
return;
}
System.out.println("hasElements=false");
sink.complete();
}
);
return page + 1;
}
);
return Flux.concat(myFlux);
}
private Flux<Integer> getOneResponePage(int page) {
System.out.println("Request for page " + page);
// there's only content on the first 3 pages
if (page < 3) {
return Flux
.just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
.map(i -> (1000 * page) + i);
}
return Flux.empty();
}
}
Goal
The goal is to have one method that is called getAllResponses()
which returns a continuous Flux<T>
of results.
The caller should not know – or care – that some pagination logic happens internally.
All other methods will be hidden from the caller.
Questions
- Since I'm new to reactive programming, am I thinking this right?
- IntelliJ warns me that Calling 'subscribe' in non-blocking context is not recommended. How to do it right?
What's getOneResponePage(int)
actually?
In my actual code, getOneResponsePage(int)
is sending a request using org.springframework.web.reactive.function.client.WebClient
.
It connects to a service that is returning results.
The service only returns a max of 1000 results per call.
An offset
parameter must be sent to get more results.
The API is a bit weird in the sense that the only way to know for sure whether you have all the results is to query it repeatedly with an ever-increasing offset
until you get an empty result set.
It will happily return more empty result sets for a still-increasing offset
(… until some internal maximum for offset
is reached and a 400 Bad Request
is returned.)
The actual implementation of getOneResponePage(int)
is almost identical to this:
private Flux<ResponseItem> getOneResponePage(int page) {
return webClientInstance
.get()
.uri(uriBuilder -> {
uriBuilder.queryParam("offset", page * LIMIT);
uriBuilder.queryParam("limit", LIMIT);
// …
})
.retrieve()
.bodyToFlux(ResponseItem.class);
}
> ` and then flatten it using `flatMapIterable`. Let me update the example.
– Alex Jun 17 '22 at 15:02