1

i am trying to build one method that should do many HTTP requests to one external endpoint while one query parameter is less than 45000.

I need to do that because the external endpoint allow me to get 100 items but there are more than 44000 items to get.

private int offset = 0;

public Flux<List<Model>> getItems() {
    return Flux.from(
            webClientBuilder
                    .build()
                    .get()
                    .uri(uriBuilder -> uriBuilder
                            .path("/getItems")
                            .queryParam("limit", 100)
                            .queryParam("offset", getOffset())
                            .build())
                    .retrieve()
                    .bodyToMono(Model.class)
                    .doOnSuccess(System.out::println)
                    .flatMap(model -> {
                        setOffset(getOffset() + 100);
                        log.info("Offset: " + getOffset());
                        return repository.saveAll(model.getData().getResults()).collectList();
                    }).delayElement(Duration.ofSeconds(15)))
                    .repeat(() -> getOffset() <= 45000);
}

public int getOffset() {
    return offset;
}

public void setOffset(int offset) {
    this.offset = offset;
}

It seems to work because logs the offset parameter incremented but HTTP requests has offset equals to 0. the method returns first 100 items instead of 44566 items

Luke
  • 516
  • 2
  • 10
  • Im only guessing, but repeat i think repeats the subscribe to the flux fetched from webclient. Dont think it repeats the entire call. I would do a recursive aproach calling the method again in the doOnSuccess – Toerktumlare Jul 20 '19 at 22:32
  • It can be good idea, i will test it too, thanks for the answer. – Luke Jul 21 '19 at 19:59

1 Answers1

3

Problem is in fact, that webclient is built eagerly before subscription, and "cached", with initial offset value . After each call, Flux is resubscribed, but prepared webservice call with offset remains "cached". You must provide weblient in lazy way (for example by wrapping it in lambda), which forces all its parameters to recalculate for each call. There is special operator for that - defer().

Solution:

Mono<Model> response = Mono.defer(() -> webClientBuilder
        .build()
        .get()
        .uri(uriBuilder -> uriBuilder
                .path("/getItems")
                .queryParam("limit", 100)
                .queryParam("offset", getOffset())
                .build())
        .retrieve()
        .bodyToMono(Model.class)
);


Flux.from(response
        .doOnEach(System.out::println)
        .flatMap(model -> {
            setOffset(getOffset() + 100);
            log.info("Offset: " + getOffset());
            return repository.saveAll(model.getData().getResults()).collectList();
        }).delayElement(Duration.ofSeconds(15))
).repeat(() -> getOffset() <= 45000).subscribe();

Another question which demonstrates identical problem with eager execution: Mono switchIfEmpty() is always called

arap
  • 499
  • 2
  • 6