I have a simple flink application which consumes alerts from one topic(kafka) and publishes to a new topic(kafka).
I have set the exactly once guarantee for data sink. But, my consumer does not consume that data when I set such guarantees over the sink.
Here are the versions - JVM version - jdk-11 Flink - 1.15.2
Producer -
KafkaSink<String> mySink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("Msg_Offset_MGMG_Tx_2")
.setKafkaProducerConfig(producerConfig)
.setRecordSerializer(new OffsetSerializer("MySink"))
.build();
I tried passing below properties in producer -
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 600000);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
Consumer -
Properties prop = new Properties();
prop.put("commit.offsets.on.checkpoint", "true");
prop.put("enable.auto.commit", "false");
prop.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
.setBootstrapServers(brokers)
.setTopics("Alerts")
.setGroupId("alertss-1")
.setProperties(prop)
.setStartingOffsets(OffsetsInitializer.earliest())
.setClientIdPrefix("Alerts_01_temp_2")
.setDeserializer("mycustomSerializer")
.build();
However, when I run this, my sink fails to push any record even though there are messages present in data source. But, when I remove below lines from data sink it works -
setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("Msg_Offset_MGMG_Tx_2")
Can anyone please tell me what I am doint wrong?