1

I have a following consumer in my Spring Boot (WebFlux/R2DBC/Reactor Kafka) application

    @EventListener(ApplicationStartedEvent::class)
    fun onMyEvent() {
        kafkaReceiver
            .receive()
            .doOnNext { record ->
                val myEvent = record.value()
                myService.deleteSomethingFromDbById(myEvent.myId)
                .thenEmpty {
                    record.receiverOffset().acknowledge()
                }.subscribe()
            }
            .subscribe()
    }

I would like to add transaction synchronisation for Kafka and DB transactions. After reading docs and some stakoverflow questions

seems like ChainedKafkaTransactionManager would be the way to go.

But following code won't work as ChainedKafkaTransactionManager expects transaction managers of type PlatformTransactionManager. So parameter r2dbcTransactionManager is not accepted.

    @Bean(name = ["chainedTransactionManager"])
    fun chainedTransactionManager(
        r2dbcTransactionManager: R2dbcTransactionManager,
        kafkaTransactionManager: KafkaTransactionManager<*, *>
    ) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)

Is there another way to achieve this?

Pawel
  • 466
  • 1
  • 7
  • 20

1 Answers1

1

There is no point in chaining transactions for Kafka consumers. Only for publishers, i.e. outgoing messages.

But you should make sure not to process the same message more than once.

@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
  kafkaReceiver.receive()
    // Make sure to have unique index on (topic, partition, offset)
    // so you receive a ConstraintViolationException
    .flatMap { r ->
      val msg = ConsumedMessage(r.topic(), r.partition(), r.offset())
      consumedMessagesRepository.save(msg).thenReturn(r)
    }
    .onErrorContinue {ex, r -> log.warn("Duplicate msg") }
    .flatMap { r ->
      myService.deleteSomethingFromDbById(r.value().myId)
        .thenReturn(r)
    }
    .flatMap { r ->
      r.receiverOffset().commit()
    }
    .subscribe()
}
Mantas K.
  • 84
  • 5