I have a following consumer in my Spring Boot (WebFlux/R2DBC/Reactor Kafka) application
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
myService.deleteSomethingFromDbById(myEvent.myId)
.thenEmpty {
record.receiverOffset().acknowledge()
}.subscribe()
}
.subscribe()
}
I would like to add transaction synchronisation for Kafka and DB transactions. After reading docs and some stakoverflow questions
- Transaction Synchronization in spring boot with Database+ kafka example
- Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction
- Transaction Synchronization in Spring Kafka
- Spring @Transactional with a transaction across multiple data sources
seems like ChainedKafkaTransactionManager
would be the way to go.
But following code won't work as ChainedKafkaTransactionManager expects transaction managers of type PlatformTransactionManager
. So parameter r2dbcTransactionManager
is not accepted.
@Bean(name = ["chainedTransactionManager"])
fun chainedTransactionManager(
r2dbcTransactionManager: R2dbcTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)
Is there another way to achieve this?