1

I have a Flux stream. For each element processed I want to have an action triggered which is an asynchronous/non-blocking one. For example, a method returning back a Mono from a db update. I want this action to be done on the doOnNext block. I don't want to affect the Flux, the processing and the back pressure implemented there.

Supposing Mono method to be called is

Mono<Integer> dbUpdate();

should my Flux be like this?

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data));
}

Or should be as mentioned on a stack overflow example.

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data).subscribe());
}

Won't the above cause blocking issues inside doOnNext?

Also which is the most appropriate scheduler to use for this type of action?

lkatiforis
  • 5,703
  • 2
  • 16
  • 35
gkatzioura
  • 2,655
  • 2
  • 26
  • 39

1 Answers1

1

dbUpdate() will be ignored if you do not subscribe to it. The following snippet doesn't print anything because Mono.just("db update") doesn't get subscribed.

Mono<String> dbUpdate() {
    return Mono.just("db update")
        .doOnNext(System.out::println);
}

public Flux<String> processData() {
    return Flux.just("item 1", "item 2")
        .doOnNext(data -> dbUpdate());
}

Note that .subscribe() doesn't block your thread, it kicks off the work and returns immediately.

lkatiforis
  • 5,703
  • 2
  • 16
  • 35