37

I was making some tests on an old topic when I noticed some strange behaviours. Reading Kafka's log I noticed this "removed 8 expired offsets" message:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

In fact, I have 2 questions:

  1. How does this offset expiration work for a consumer group?

  2. Can this expired offset explain this behaviour where my consumer would not poll anything when it had auto.offset.reset = latest, but it polled from the last committed offset when it had auto.offset.reset = earliest ?

Gray
  • 115,027
  • 24
  • 293
  • 354
Enzo
  • 371
  • 1
  • 3
  • 4

2 Answers2

59

Update

Since Apache Kafka 2.1, offsets won't be deleted as long as the consumer group is active, independent if the consumers commit offsets or not, ie, the offset.retention.minutes clocks only starts to tick when the group becomes empty (in older released, the clock started to tick directly when the commit happened).

Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

Original Answer

Kafka, by default deletes committed offsets after a configurable period of time. See parameter offsets.retention.minutes. Ie, if a consumer group is inactive (ie, does not commit any offsets) for this amount of time, the offsets get deleted. Thus, even if the consumer is running, if it does not commit offsets for some partitions, those offsets are subject to offset.retention.minutes.

If you start a consumer, the following happens:

  1. look for a (valid) committed offset (for the consumer group)
    1. if valid offset is found, resume from there
    2. if no valid offset is found, reset offset according to auto.offset.reset parameter

Thus, if your offsets got deleted and auto.offset.reset = latest, you consumer will not poll anything until new data is added to the topic. If auto.offset.reset = earliest it should consume the whole topic.

See this JIRA for a discussion about this https://issues.apache.org/jira/browse/KAFKA-3806 and https://issues.apache.org/jira/browse/KAFKA-4682

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks. So, even with an active consumer if there isn't any new offset commit for this retention time the offsets would be deleted? – Enzo Aug 25 '16 at 14:09
  • As long as the group is active, offsets should not get deleted. – Matthias J. Sax Aug 26 '16 at 17:56
  • 5
    This is not necessarily true. If you set enable.auto.commit=false and there's no new data (no commit) - the commit will expire. – b2zw2a Jan 31 '17 at 17:16
  • Ok. Fair enough. But that would be a very special pattern, that a consumer group does not commit for a longer time than offsets.retention setting... – Matthias J. Sax Jan 31 '17 at 22:50
  • 3
    Currently this affects Kafka Streams, which sets `enable.auto.commit=false` and has `auto.offset.reset=earliest`. By default, if a Kafka Streams application does not process data for 24 hours, and then it restarts, its offsets are deleted and it reprocesses data from the beginning. – Dmitry Minkovsky Jul 24 '17 at 20:21
  • 3
    @DmitryMinkovsky Yes. That is correct. There is a corresponding JIRA: https://issues.apache.org/jira/browse/KAFKA-5510 – Matthias J. Sax Jul 26 '17 at 12:45
  • AND if a topic was IDLE. – Ravindranath Akila Jul 06 '18 at 09:31
  • I have a similar question, when I restart the kafka streams app, the internal repartition topic has in-traffic but no message are produced to the related changelog topic. I have to change to a new groupId and create new internal topics and it will work. Does anyone know the reason? Thanks a lot! – thinktwice Sep 04 '19 at 20:17
  • Restarting a Kafka Streams application should not have any impact. It should just continue where it left off. – Matthias J. Sax Sep 05 '19 at 03:16
  • This seems to be happening for regular kafka-consumer also, when configured to use manual commits and when there's no data being ingested to assigned topic/partition. Can someone confirm ? – V1666 Mar 05 '22 at 22:54
  • I depends on the version -- as pointed out in the answer, if you use Kafka 2.1 or newer, as long as a consumer group is active, offsets won't get expired even if there is not input data and no commits happen -- time offset-retention time only start to tick when the consumer group (ie, all consumers of the group) go offline. – Matthias J. Sax Mar 07 '22 at 18:17
1

Check my answer here. You should not forget about file rolling. It impacts offset files removal.

yuranos
  • 8,799
  • 9
  • 56
  • 65