1

I have a problem implementing kafka "exactly once" semantics and Micronaut framework.

I have two servers with same code that are sending messages to a kafka broker, and I have this error:

org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId

In the documentation of kafka says:

"Zombie fencing
We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts.
The API requires that the first operation of a transactional producer should be to explicitly register its transactional.id with the Kafka cluster. When it does so, the Kafka broker checks for open transactions with the given transactional.id and completes them. It also increments an epoch associated with the transactional.id. The epoch is an internal piece of metadata stored for every transactional.id.
Once the epoch is bumped, any producers with same transactional.id and an older epoch are considered zombies and are fenced off, ie. future transactional writes from those producers are rejected."

But the transactional.id of kafka producer in Micronaut does not support unique ID, so I have to create a name with random value and to associated with the producer:

In application.yml:

kafka:
    exactly-once:
      transfer-payment-sender-transactionalid: payment-sendto
      transfer-payment-sender-suffix: ${random.shortuuid}
      transfer-payment-sender-name: ${transfer-payment-sender-transactionalid}_${transfer-payment-sender-suffix}

In the service:

@Value("${kafka.exactly-once.transfer-payment-sender-name}")
    String producer_name;

    public TransferPaymentSender(@KafkaClient(
            id="payment-transactional-client-endpoint",
            transactionalId = "${kafka.exactly-once.transfer-payment-sender-name}",
    ) Producer<String, PaymentRequest> producer) {
        this.producer = producer;
        this.producer.initTransactions();
    }

Is this correct? Is this the correct way of instantiate Kafka producer in Micronaut for implementing exactly once semantics?

This post says that Spring handle this under the hood.

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Agustin
  • 41
  • 4

0 Answers0