I am trying to setup exactly once semantics using kafka transactions with micronaut kafka.
I need to read records from topic A, map key and value and produce to topic B. I would like to implement this exactly once using kafka transactions.
According to the micronaut kafka documentation, when using the @SendTo annotation on a method of an @KafkaListener bean, the offset commit can be forwarded to the producer transaction.
However, the @SendTo annotation only allows to return a new record value and uses the key of the original kafka message received by the @KafkaListener bean. Is there a way to also change the key?
@KafkaListener(
groupId = "mx-group",
offsetReset = EARLIEST,
offsetStrategy = SEND_TO_TRANSACTION,
producerClientId = "prod-client-id",
producerTransactionalId = "prod-client-tx"
)
class MyListener {
@Topic("source.topic")
@SendTo("target.topic")
fun handle(
@KafkaKey key: SourceKey,
@MessageBody value: SourceValue
): TargetValue = convertValue(value)
...
}
If not, is there a way to forward the offset commit to a transaction of a producer bean created with the @KafkaClient annotation?
@KafkaListener(
groupId = "my-group",
offsetReset = EARLIEST,
offsetStrategy = ???
)
class MyListener(private val producer: MyProducer) {
@Topic("source.topic")
fun handle(
@KafkaKey key: SourceKey,
@MessageBody value: SourceValue
) {
LOGGER.info("Received record key {}, value {}", key, value)
producer.send(convertKey(key), convertValue(value))
}
...
}
@KafkaClient(
id = "producer-id",
transactionalId = "producer-tx",
)
interface MyProducer {
@Topic("target.topic")
fun send(@KafkaKey key: TargetKey, @MessageBody value: TargetValue)
}
Any help is very welcome!
Regards, Lars