I have gone through the project reactor
documentation for map()
and flatMap()
method and also a good explanation in this answer.
But my query is regarding when we use the reactor
KafkaReceiver
. Following code example I have :
//start of consumption
public Disposable consumeMessage() {
return processKafkaRecord().subscribe(record -> log.info("success"),
error -> log.error("error logged" + error));
}
public Flux<String> processKafkaRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
.flatMap(this::processMessage)
.doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
}
private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
//logic
.flatMap(this::processOne);
.flatMap(this::processTwo);
.flatMap(this::processThree);
}
My doubt in short is if I use .map()
method in processMessage
instead of .flatMap()
function will it have any difference in performance for the KafkaReceiver
?
Doubt explained : When consuming a stream of data in KafkaReceiver
we already using a flux
for consumption and in processKafkaRecord
method the call happens through a flatMap()
method, so each individual record should be processed in async with this only.
And once we reach processMessage()
method, it is actually processing a single record. Now if my processOne
, processTwo
and processThree
method has to happen in synchronous manner for each individual event, does it make sense to use flatMap()
instead of map()
.
Once a flatMap() method was called in processKafkaRecord() method, the inner method will already be in async for each event. And using map would make more sense if each process in the processMessage has to happen in synchronous manner? Or I am wrong with this conclusion, and we should use flatMap even in inner method for performance?