2

I have a reactive kafka application that reads data from a topic, transforms the message and writes to another topic. I have multiple partitions in the topic so I am creating multiple consumers to read from the topics in parallel. Each consumer runs on a different thread. But looks like kafka send runs on the same thread even though it is called from different consumers. I tested by logging the thread name to understand the thread workflow, the receive thread name is different for each consumer, but on kafka send [kafkaProducerTemplate.send] the thread name [Thread name: producer-1] is the same for all the consumers. I don't understand how that works, i would expect it to be different for all consumers on send as well. Can someone help me understand how this works.

@Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
    }

@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
    return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}

@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
        KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}


public Flux<String> readWrite(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)               
                .onErrorContinue((exception,errorConsumer)->{
                    log.error("Error while consuming : {}", exception.getMessage());
                });
    }

public void sendToKafka(ConsumerRecord<String, String> consumerRecord, String destTopic){
   kafkaProducerTemplate.send(destTopic, consumerRecord.key(), transformRecord(consumerRecord))
                    .doOnNext(senderResult -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                    .doOnSuccess(senderResult -> {
                        log.debug("Sent {} offset : {}", metrics, senderResult.recordMetadata().offset());
                    }
                    .doOnError(exception -> {
                        log.error("Error while sending message to destination topic : {}", exception.getMessage());
                    })
                    .subscribe();
}
perplexedDev
  • 857
  • 4
  • 17
  • 49

1 Answers1

2

All sends for a producer are run on a single-threaded Scheduler (via .publishOn()).

See DefaultKafkaSender.doSend().

You should create a sender for each consumer.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Can you help me understand the workflow. Each consumer is calling kafkaProducerTemplate.send(), wouldn't that create a different sender instance? – perplexedDev Nov 09 '21 at 20:41
  • No; there is a single sender associated with each template. You need to create either a separate template for each consumer or create a pool and check out an instance and return it to the pool when you are finished with it. – Gary Russell Nov 09 '21 at 20:56
  • 1
    Thanks for the explanation. Reiterating my understanding: in my code, KafkaConsumerTemplate is having multiple consumer instances since .receive() creates different receivers and each receiver will be subscribed to different partitions. But only a single sender is associated with KafkaProducerTemplate so different templates have to be created for each consumer. I expected .send() to work similar to .receive(), but looks like I was wrong. Can you please confirm this is correct? – perplexedDev Nov 09 '21 at 21:06
  • The templates are very lightweight wrappers around the reactive sender and receiver; the actual functionality is provided by that library (reactor-kafka). Yes, it does seem a bit asymmetric but if you think about it, `receive()` is a long running operation whereas `send()` is typically very short (async). For most applications a single producer is all that is needed; see the javadocs for `KafkaProducer` `...a single producer instance across threads will generally be faster...`. If you are using transactions, you must use separate producers because only one transaction can be active at a time. – Gary Russell Nov 09 '21 at 21:38