1

I have a simple flink application which consumes alerts from one topic(kafka) and publishes to a new topic(kafka).

I have set the exactly once guarantee for data sink. But, my consumer does not consume that data when I set such guarantees over the sink.

Here are the versions - JVM version - jdk-11 Flink - 1.15.2

Producer -

        KafkaSink<String> mySink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("Msg_Offset_MGMG_Tx_2")
                .setKafkaProducerConfig(producerConfig)             
                .setRecordSerializer(new OffsetSerializer("MySink"))                
                .build();  

I tried passing below properties in producer -

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 600000);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

Consumer -

Properties prop = new Properties();
        prop.put("commit.offsets.on.checkpoint", "true");
        prop.put("enable.auto.commit", "false");
        prop.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
                .setBootstrapServers(brokers)
                .setTopics("Alerts")
                .setGroupId("alertss-1")
                .setProperties(prop)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setClientIdPrefix("Alerts_01_temp_2")              
                .setDeserializer("mycustomSerializer")
                .build();

However, when I run this, my sink fails to push any record even though there are messages present in data source. But, when I remove below lines from data sink it works -

setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("Msg_Offset_MGMG_Tx_2")

Can anyone please tell me what I am doint wrong?

  • 1
    There are two things I can see as an issue: 1. Have you enabled checkpointing? If not, that's a requirement for exactly-once to work 2. You need to check the value of `transaction.max.timeout.ms` setting at your Kafka broker and pass in a producer property `transaction.timeout.ms` which has a lower value then `transaction.max.timeout.ms` – Martijn Visser Jun 20 '23 at 06:57
  • 1
    Yes, we have checkpointing enabled. Added the settings you mentioned. It's working. We also added `transaction.state.log.replication.factor=1` and `transaction.state.log.min.isr=1` to our kafka broker. This enabled the transaction support to the broker. Its a staging env broker that's why we have set the replication and min isr to 1. – Chaitanya Kulkarni Jun 20 '23 at 10:00

0 Answers0