9

I'm trying to implement reactive kafka consumer in my Spring boot application and I'm looking at these examples: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java

and it looks like there is no support for Spring in reactive kafka yet

I understand how kafka listeners work in non-reactive kafka API in Spring: simplest solution is to configure beans for ConcurrentKafkaListenerContainerFactory and ConsumerFactory, then use @KafkaListener annotation and voila

But I'm not sure how to properly use reactive kafka in Spring right now.

Basically I need a listener for topic. Should I create some kind of loop or scheduler of my own? Or maybe I'm missing something. Can anyone share their knowledge and best practices?

bobryash
  • 129
  • 1
  • 8
  • 1
    Did you see a `ReactiveKafkaConsumerTemplate` in Spring for Apache Kafka project: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java ? – Artem Bilan Dec 01 '20 at 19:42
  • 1
    Reactive support for `@KafkaListener` is on the road map for next year. Right now, all we have is the lightweight wrapper that Artem mentioned. That said, managing partition offsets for a reactive (or any async) consumer is particularly difficult. – Gary Russell Dec 01 '20 at 20:15
  • @ArtemBilan thanks for the link, will look into that – bobryash Dec 02 '20 at 19:15
  • @GaryRussell has the reactive support for `@kafkaListener` arrived yet? I am unable to find the same. – mohitsahu May 25 '22 at 20:54
  • No, sorry. Contributions are always welcome. – Gary Russell May 25 '22 at 21:59

1 Answers1

0

I don't have a ready solution yet but i'm trying this (Kotlin code, Spring Boot). Someone published part of this code snippet here https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

Look into other stack overflow questions. There is not much there, but maybe will give you some ideas

Pawel
  • 466
  • 1
  • 7
  • 20