0

I have gone through the project reactor documentation for map() and flatMap() method and also a good explanation in this answer.

But my query is regarding when we use the reactor KafkaReceiver. Following code example I have :

    //start of consumption    
    public Disposable consumeMessage() {
        return processKafkaRecord().subscribe(record -> log.info("success"),
                error -> log.error("error logged" + error));
    }

    public Flux<String> processKafkaRecord() {
         Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
         return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
            .flatMap(this::processMessage)
            .doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
    }

    private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
         //logic
        .flatMap(this::processOne);
        .flatMap(this::processTwo);
        .flatMap(this::processThree);
    }

My doubt in short is if I use .map() method in processMessage instead of .flatMap() function will it have any difference in performance for the KafkaReceiver?

Doubt explained : When consuming a stream of data in KafkaReceiver we already using a flux for consumption and in processKafkaRecord method the call happens through a flatMap() method, so each individual record should be processed in async with this only.

And once we reach processMessage() method, it is actually processing a single record. Now if my processOne, processTwo and processThree method has to happen in synchronous manner for each individual event, does it make sense to use flatMap() instead of map().

Once a flatMap() method was called in processKafkaRecord() method, the inner method will already be in async for each event. And using map would make more sense if each process in the processMessage has to happen in synchronous manner? Or I am wrong with this conclusion, and we should use flatMap even in inner method for performance?

arqam
  • 3,582
  • 5
  • 34
  • 69

1 Answers1

1

It really depends what is your processing logic but it looks like you are mixing concurrency, parallelism and asynchronous/non-blocking execution. Asynchronous execution doesn't mean fire and forget. It's more about no-blocking execution. Logic could still be sequential.

map vs flatMap

  • use flatMap to execute async/reactive logic such as http requests, db read/write, other I/O bound operations and returns Mono or Flux.
  • use map to execute sync logic such as object mapping.

Concurrency

By default, flatMap will process Queues.SMALL_BUFFER_SIZE = 256 number of in-flight inner sequences concurrently.

You could control concurrency flatMap(item -> process(item), concurrency) or use concatMap operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.

There are different "flavors" of flatMap. If you need sequential processing - use concatMap that is basically flatMap with concurrency = 1.

Kafka ordered vs unordered processing

Depending on use case there are several options.

Ordered message processing In case message order is important and messages should be processed in the same sequence as they are sent by producer. Kafka guarantees message order per partition.

In Reactior Kafka you can do it by grouping data per partition and then process then sequentially

kafkaReceiver.receive()
        .groupBy(message -> message.receiverOffset().topicPartition())
        .flatMap(partitions -> partitions.concatMap(this::process));

Unordered message processing

In case sequence is not imporant and messages could be processed in any order we could increase throughput by processing multiple messages in parallel.

kafkaReceiver.receive()
        .flatMap(message -> process(message), concurrency);

Unordered message processing will support much higher throughput on a small number of partitions. For ordered message processing you would need to increase number of partitions to increase throughput.

Alex
  • 4,987
  • 1
  • 8
  • 26