16

I have a method like below in my Spring boot app.

public Flux<Data> search(SearchRequest request) {
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;
}

//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
  //do some processing here
}

Currently, I'm using @Async annotated service class with doThisAsync, but don't know how to pass the List<Data>, because I don't want to call block. All I have is Mono<List<Data>>.

My main problem is how to process this Mono separately and the search method should return the Flux<Data>.

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Avinash Anand
  • 655
  • 2
  • 15
  • 25

3 Answers3

27

1, If your fire-and-forget is already async returning Mono/Flux

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public Mono<Void> doThisAsync(List<Data> data) {
    //do some async/non-blocking processing here like calling WebClient
}

2, If your fire-and-forget does blocking I/O

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public void doThisAsync(List<Data> data) {
    //do some blocking I/O on calling thread
}

Note that in both of the above cases you lose backpressure support. If the doAsyncThis slows down for some reason, then the data producer won't care and keep producing items. This is a natural consequence of the fire-and-foget mechanism.

Martin Tarjányi
  • 8,863
  • 2
  • 31
  • 49
  • In approach 2, will the search method not wait until .doOnNext is executed completely? – Avinash Anand Aug 20 '19 at 09:07
  • No, it won't if it is really async. – Martin Tarjányi Aug 20 '19 at 09:27
  • sorry, I think I explained it wrong. The `doThisAsync()` is a normal method that runs some long db queries and some reporting services. All these are heavy call and blocking in nature. So, I want it to run separately and the `search` method should return its response. – Avinash Anand Aug 20 '19 at 09:42
  • But isn't it annotated with @Async as you described? If it is then you are fine with the above solution. Of course, you need to test it to make sure it works as expected. – Martin Tarjányi Aug 20 '19 at 09:48
  • That's what I was coming to. Instead of `@Async`, is there any method in Mono or flux that does this. Using `@Async` didn't felt correct here. – Avinash Anand Aug 20 '19 at 09:50
  • 1
    I see, I've added a third option to the answer. Please, check it. It moves the blocking code to a separate thread pool. – Martin Tarjányi Aug 20 '19 at 09:55
  • After comments, I've simplified and narrowed down the possible options. – Martin Tarjányi Apr 15 '20 at 08:03
  • @MartinTarjányi doesn't `collectList()` convert the flux to a mono, and hence will wait for all the events to be completed. The flux which is returned is being created from this Mono. Doesn't that any subscriber on the response `Flux` will only get events when the mono is complete? – tsar2512 Sep 04 '21 at 17:52
  • I would have thought something along the lines of `Flux result = searchService.search(request); result.collectList().doOnNext(...); return result;` would work fine? Again this is a question rather than a suggestion, since I'm a newbie in this as well. This solution basically doesn't change the result object at all for the async operation – tsar2512 Sep 04 '21 at 17:52
  • @tsar2512 no, that wouldn't work as intended. 1. A subscribe is missing on the second line, side effect wouldn't be executed without it. 2. Even if subscribe is there, the search would be executed twice due to two subscriptions... An alternative could be to use cache operator on the first line which would prevent the double execution. – Martin Tarjányi Sep 05 '21 at 08:10
  • @MartinTarjányi - can you give an example of a cache operator? Also for (2), maybe I misunderstand, but isn't the publish->subscribe principle a push based one here? i.e. I can have multiple subscribers for a single producer, that shouldn't force it to execute twice on the producer? In this case search response is basically the producer right? – tsar2512 Sep 06 '21 at 03:57
  • @MartinTarjányi - here is a gist which shows what I am doing. What am I missing? https://gist.github.com/nipunarora/a9bd3a3867fea61a6b227144067e045c – tsar2512 Sep 06 '21 at 04:01
3

Have you considered running the processing in separate threads using publishOn like in the example below? This may not be exactly what you are asking for but allows you to continue with other matters while the processing of the results in the flux is done by one or more threads, four in my example, from a dedicated scheduler (theFourThreadScheduler).

    @Test
    public void processingInSeparateThreadTest() {
        final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
        final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");

        theResultFlux.log()
            .collectList()
            .publishOn(theFourThreadScheduler)
            .subscribe(theStringList -> {
                doThisAsync(theStringList);
            });

        System.out.println("Subscribed to the result flux");

        for (int i = 0; i < 20; i++) {
            System.out.println("Waiting for completion: " + i);
            try {
                Thread.sleep(300);
            } catch (final InterruptedException theException) {
            }
        }
    }

    private void doThisAsync(final List<String> inStringList) {
        for (final String theString : inStringList) {
            System.out.println("Processing in doThisAsync: " + theString);
            try {
                Thread.sleep(500);
            } catch (final InterruptedException theException) {
            }
        }
    }

Running the example produce the following output, showing that the processing performed in doThisAsync() is performed in the background.

Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19

References: Reactor 3 Reference: Schedulers

0

UPDATE 2023/01/31

Actually you anyway should use .subscribeOn() because even if you call your fire-and-forget function which returns Mono<Void> it is not guaranteed that within that reactive chain will be switching of executing thread or it will happen immediately (depends on the code inside that fire-and-forget function, more specificaly, operators that used on the chain).

So you may run into situation when your fire-and-forget function will be executed on the same thread that called this function, so your method will not return until this function is completed.

The case when fire-and-forget function returns Publisher<Void>:

public Flux<Data> search(SearchRequest request) {
    return searchService.search(request)
            .collectList()
            .doOnNext(data ->
                    // anyway call subscribeOn(...)
                    fireAndForgetOperation(data)
                            .subscribeOn(...)
                            .subscribe()
            )
            .flatMapMany(Flux::fromIterable);
}

public Mono<Void> fireAndForgetOperation(List<String> list) {
    ...
}

The case when fire-and-forget function is just a common void returning method:

public Flux<Data> search(SearchRequest request) {
    return searchService.search(request)
            .collectList()
            .doOnNext(data ->
                    Mono.fromRunnable(() -> fireAndForgetOperation(data))
                            .subscribeOn(...)
                            .subscribe()
            )
            .flatMapMany(Flux::fromIterable);
}

public void fireAndForgetOperation(List<String> list) {
    ...
}

Also you should consider what Scheduler you need to provide, depending on the nature of your fire-and-forget function.

Basically there are two scenarios:

1) If your fire-and-forget function does CPU-Bound work. Then you want to specify Schedulers.parallel() inside subsribeOn()

2) If your fire-and-forget function does IO work (actually no matter in this case if it would be blocking or non-blocking IO). Then you want to specify Schedulers.boundedElastic() inside subsribeOn()

So, using this approach you will truly return immediately after firing your fire-and-forget function

kerbermeister
  • 2,985
  • 3
  • 11
  • 30