2

I am new to webflux and am not able to find the right material to continue with the implementation.

I want to issue a request and process the response asynchronously. In this case service call takes about 8-10 ms to respond, so we issue the request and continue doing other work, and look for the response when it is needed for further processing.

Mono<Map<String,Price>> resp = webClient.post()
.uri("/{type}",isCustomerPricing ? "customer" : "profile")
.body(Mono.just(priceDetailsRequest),PriceDetailsRequest.class)
.retrieve().bodyToMono(customerPriceDetailsType);

How do we make this call execute asynchronously on a different thread.(I tried subscriberOn with Schedulers.single/ Scheuldes.parallel), but didn't see the call getting executed until Mono.block() is called.

How do we achieve ?

  1. We want this call execute in parallel on a separate thread, so the current thread can continue with other work
  2. When processing completes, set response to context
  3. When the current thread looks for the response, if the service has not completed, block until the call completes
basu76
  • 441
  • 10
  • 19
  • Was able to get it publish asynchronously and get response by doing this - .subscribeOn(Schedulers.elastic()).subscribe(x -> LOGGER.info("Received Response"+x.getClass())). Is this the right way ? when the response needs to be used, if it is not available, how can I have the current thread block (Much like Future.get()) – basu76 Jan 09 '19 at 20:44
  • It already automatically waits, but in a non-blocking way, in that the event-loop is released until the response is returned. – Rajesh J Advani Jan 21 '19 at 11:10

1 Answers1

2

You don't need to block for consuming the response. Just assign an operator to consume the response in the same chain. An example is given below.

Mono<Map<String,Price>> resp = webClient.post()
        .uri("/{type}",isCustomerPricing ? "customer" : "profile")
        .body(Mono.just(priceDetailsRequest),PriceDetailsRequest.class)
        .retrieve()
        .bodyToMono(CustomerPriceDetailsType.class)
        .map(processor::responseToDatabaseEntity) // Create a persistable entity from the response
        .map(priceRepository::save)               // Save the entity to the database
        .subscribe();                             //This is to ensure that the flux is triggered.

Alternatively you can provide a consumer as a parameter of the subscribe() method.

Rajesh J Advani
  • 5,585
  • 2
  • 23
  • 35
  • I am using subsribeOn with Schedulers.elastic() and it does execute the call, while the thread that created this continuing to do some other work. The issue is when it is needed, what is the best way to wait for the response if the consumer on the subscribe method has not yet been executed. – basu76 Jan 21 '19 at 10:57
  • Are there two separate calls? One making the request and the second getting the response? Or is it a single request-response cycle? If the latter, then the code above is sufficient. The operators on the response don't execute until the response actually is ready. – Rajesh J Advani Jan 21 '19 at 11:05