9

I have a Spring Flux application where at some point I need to execute some heavy task on the background, the caller (a HTTP request) does not need to wait until that task completes.

Without reactor, I would just probably use the Async annotation, executing that method on a different thread. With reactor, I am not sure if I should proceed with that approach or if there is already a built-in mechanism that allows me to accomplish this.

For example, given a Controller that accepts a Resource object:

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    processor.run(r); // the caller should not wait for the resource to be processed
    return repository.save(r);
}

And a Processor class:

@Async
void run(Resource r) { 
    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> result = webClient.get()
                                   .retrieve()
                                   .bodyToMono(String.class);
    String response = result.block(); //block for now
}

The HTTP caller for /create should not need to wait until the run method completes.

Jonathan Naguin
  • 14,526
  • 6
  • 46
  • 75

2 Answers2

8

If you are looking for the fire-and-forget pattern implementation, you could just subscribe your publisher

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    run(r).subscribe();
    return repository.save(r);
}

Mono<Void> run(Resource r) {
    WebClient webClient = WebClient.create("http://localhost:8080");
    return webClient.get()
            .retrieve()
            .bodyToMono(String.class)
            .then();
}

If your publisher executes blocking operations it should be subscribed on other thread with elastic or parallel scheduler.

Alexander Pankin
  • 3,787
  • 1
  • 13
  • 23
  • Will “run” always complete? Even if the caller stream/thread terminates a lot before? – Jonathan Naguin Mar 20 '19 at 20:28
  • It's not clear for me "thread terminates". If you use netty NIO threads, they terminates only on application shutdown/crash. In such case you could lose completion (with @Async situation is the same). I have [a small demo](https://github.com/alex-pumpkin/stackoverflow-demo-1) to illustrate that execution completes after controller returns result. Run StackoverflowDemo1ApplicationTests and see logs. – Alexander Pankin Mar 21 '19 at 12:14
5

I did some testing, and I think even using subscribe() as fire and forget will wait for request to complete before returning an answer to the webbrowser or REST-client (at least in my simple tests, it looks like that). So, you have to do the similar of @Async, create another thread:

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    return processor.run(r)
    .subscribeOn(Schedulers.elastic()) // put eveything above this line on another thread
    .doOnNext(string -> repository.save(r)); // persist "r", not changing it, though

}

And a Processor class:

Mono<String> run(Resource r) { 
    WebClient webClient = WebClient.create("http://localhost:8080");
    return webClient.get()
           .retrieve()
           .bodyToMono(String.class);
}
Frischling
  • 2,100
  • 14
  • 34