1

Currently in my kafka consumer i have turned off auto commit, so currently when processing of messages failed for ex: three invalid messages, the manual ack fails and the lag increases to three.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
orders 0 35 38 3

After that if a new incoming valid message comes through and the processing of that message is successfully completed, it is manually acked and after that consumer looks like this

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
orders 0 39 39 0

Why does consumer set the current-offset to 39 when the messages with offset 36, 37, 38 were not successfully processed and they are never read again by the same consumer

Can anyone pls explain this behavior? Thanks in advance!

1 Answers1

1

In Kafka, consumers don't ack every single messages. Instead they ack (commit) the offset of the last message they processed.

For example, if you commit offset 15, it implicitly means you've processed all messages before from 0 to 15. Also when committing 15, you overwrite any previous commit, so you cannot know if you committed 13 or 14 before.

I suggest you read the Consumer Position section in the docs that go over this concept.

Regarding reprocessing, Kafka offers a few options. When hitting a processing failure, before polling for more messages and processing new records, you can try to reprocess the message. Another option is to skip it as invalid and carry on (what you are currently doing).

On the other hand, you could ensure data is good by running a Streams job to pipe valid messages into a checked topic and forward bad messages to a DLQ. Then consume from this checked topic where you know you only have good messages. See validation for kafka topic messages

Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
  • Thanks mickael it is very useful, i have a batch conumer, from my understanding after a couple of bad messages which were not acked, when a valid message comes through , the consumer will try to reprocess all 3 messages ( 2 bad messages which not acked and the latest valid message). But when i checked the batch it just has the latest message. The consumer should always try to read from the CURRENT-OFFSET (last committed) rite ? why is reading from LOG-END-OFFSET (latest offset) ? – Shanmugarajan Vijayakumar Dec 19 '18 at 00:33
  • The committed position is used when restarting consumers. While running the consumer updates its position continually when poll is called. If you want to return to a previous offset, you can use the various seek methods to do so. See http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html – Mickael Maison Dec 19 '18 at 09:45