0

Consider the following code under these condition:

getOneResponePage(int) produces a Flux<Integer>. It's simulating a request to fetch a page of results from another service. It's implementation should be can be treated as black box for my questions. (See below for an explanation of it's actual purpose.)

getOneResponePage(int) will eventually return an empty Flux<Integer>, which is it's way to signal that no more results will come. (But it will keep on emiting empty Flux<Integer>.)

package ch.cimnine.test;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class PaginationTest {
    @Test
    public void main() {
        final Flux<Integer> finalFlux = getAllResponses();

        finalFlux.subscribe(resultItem -> {
            try {
                Thread.sleep(200); // Simulate heavy processing
            } catch (InterruptedException ignore) {
            }

            System.out.println(resultItem);
        });
    }

    private Flux<Integer> getAllResponses() {
        Flux<Flux<Integer>> myFlux = Flux.generate(
            () -> 0, // inital page
            (page, sink) -> {
                var innerFlux = getOneResponePage(page); // eventually returns a Flux.empty()

                // my way to check whether the `innerFlux` is now empty
                innerFlux.hasElements().subscribe(
                    hasElements -> {
                        if (hasElements) {
                            System.out.println("hasElements=true");
                            sink.next(innerFlux);
                            return;
                        }

                        System.out.println("hasElements=false");
                        sink.complete();
                    }
                );

                return page + 1;
            }
        );

        return Flux.concat(myFlux);
    }

    private Flux<Integer> getOneResponePage(int page) {
        System.out.println("Request for page " + page);
        
        // there's only content on the first 3 pages
        if (page < 3) {
            return Flux
                .just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
                .map(i -> (1000 * page) + i);
        }

        return Flux.empty();
    }
}

Goal

The goal is to have one method that is called getAllResponses() which returns a continuous Flux<T> of results. The caller should not know – or care – that some pagination logic happens internally. All other methods will be hidden from the caller.

Questions

  1. Since I'm new to reactive programming, am I thinking this right?
  2. IntelliJ warns me that Calling 'subscribe' in non-blocking context is not recommended. How to do it right?

What's getOneResponePage(int) actually?

In my actual code, getOneResponsePage(int) is sending a request using org.springframework.web.reactive.function.client.WebClient. It connects to a service that is returning results. The service only returns a max of 1000 results per call. An offset parameter must be sent to get more results.

The API is a bit weird in the sense that the only way to know for sure whether you have all the results is to query it repeatedly with an ever-increasing offset until you get an empty result set. It will happily return more empty result sets for a still-increasing offset (… until some internal maximum for offset is reached and a 400 Bad Request is returned.)

The actual implementation of getOneResponePage(int) is almost identical to this:

private Flux<ResponseItem> getOneResponePage(int page) {
    return webClientInstance
        .get()
        .uri(uriBuilder -> {
            uriBuilder.queryParam("offset", page * LIMIT);
            uriBuilder.queryParam("limit", LIMIT);
            // …
        })
        .retrieve()
        .bodyToFlux(ResponseItem.class);
}
cimnine
  • 3,987
  • 4
  • 33
  • 49

2 Answers2

1
  1. Try to avoid Flux<Flux<T>>. Another anti-pattern is to subscribe explicitly (innerFlux.hasElements().subscribe). Ideally you need to subscribe only once, typically on framework layer (e.g. WebFlux subscribes in the underlining HTTP server).

Querying data with a continuously increasing pointer (page number, offset, etc) is a very common pattern and you could use expand operator to implement it. In case of Flux it will try to expand every element. For pagination usually Mono<List<T>> is more useful. Expand will try to expand every page and will stop when getOneResponePage returns Mono.empty().

private Flux<Integer> getAllResponses() {
    var page = new AtomicInteger(0); // initial page
    return getOneResponePage(page.get())
            .expand(list -> getOneResponePage(page.incrementAndGet()))
            .flatMapIterable(Function.identity());
}


private Mono<List<Integer>> getOneResponePage(int page) {
        System.out.println("Request for page " + page);

        // there's only content on the first 3 pages
        if (page < 3) {
            return Flux
                    .just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
                    .map(i -> (1000 * page) + i)
                    .collectList();
        }

        return Mono.empty();
    }

  1. In case your flow is non-blocking you need to subscribe on parallel scheduler. boundedElastic should be used to "offload" blocking tasks. Use .subscribeOn(Schedulers.parallel()). For more details you could check Difference between boundedElastic() vs parallel() scheduler
Alex
  • 4,987
  • 1
  • 8
  • 26
  • With your example, when does it end increasing the page? Or in other terms, how will the Flux that is returned from `getAllResponses()` complete? It's because `getOneResponsePage` will just return an empty collection up to page Integer.MAX or even Long.max, but that are many unnecessary calls to the remote service. – cimnine Jun 17 '22 at 06:52
  • Oh, didn't notice that it was `Flux`. In this case it will try to expand every element. For pagination better to use `Mono> ` and then flatten it using `flatMapIterable`. Let me update the example. – Alex Jun 17 '22 at 15:02
0

There is no direct way of stopping the outer flow from the inner flow. The closest thing would be to use switchIfEmpty with a Flux.error(NoSuchElementException) in on the inner sequence, then on the outer sequence use onErrorResumeNext and return an empty Flux if it finds that NoSuchElementException.

Flux.just(listOf(1, 2, 3), listOf(), listOf(4, 5, 6))
.flatMap(list ->
     Flux.fromIterable(list)
     .switchIfEmpty(Flux.error(new NoSuchElementException()))
)
.onErrorResumeNext(e -> 
      e instanceof NoSuchElementException ?
      Flux.empty() : Flux.error(e)
);
akarnokd
  • 69,132
  • 14
  • 157
  • 192