12

We are facing a problem with Kafka. We have a topic with only a partition and only one consumer in a consumer group. The consumer has been stopped for a month. In the meantime, producers are sending messages to the topic.

When we start the consumer again, it is not able to consume messages. I assume that the previously committed offset has been lost, so the consumer has no idea to find the starting point when awaken.

When we stop and start the consumer again, then the consumer can pick up the new messages, but all message that has been sent previously never got consumed.

Has offset been corrupted? Does the retention period for the kafka internal topic offsets, mean that the last committed offsets been has removed?

Gray
  • 115,027
  • 24
  • 293
  • 354
Joey Trang
  • 1,105
  • 2
  • 23
  • 44
  • Possible duplicate of [How does an offset expire for an Apache Kafka consumer group?](http://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group) – Matthias J. Sax Mar 02 '17 at 04:56
  • hi @matthias-j-sax , presuming we have facing problem with offset management, what is the best practice to maintain offset, is this a good idea to have offset maintained outside of kafka cluster, such as RDMS, so in the event of losing offset, we can restart consumer from the offset which stored in DB? – Joey Trang Mar 02 '17 at 07:10
  • 1
    Simplest thing is to change broker setting an increase offset retention time. If you set it to INT_MAX you get multiple years... That should do. If you really need longer retention time, you can store offset anywhere you want like an DB, too. But it's of course, more code to maintain in you own code base. – Matthias J. Sax Mar 02 '17 at 07:20
  • I am abit confused as even I have set offset.retention.minutes to 10 mins, I was expecting offsets of some particular groups will be removed because of no active consumer group at present, but when I monitor log there is no offset expired `[2017-03-02 06:57:24,907] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)` @MatthiasJ.Sax – Joey Trang Mar 02 '17 at 07:23
  • Not sure... how long did you wait? retention.time is a guaranteed lower bound, and there is no strict enforcement to delete the offsets immediately -- there is also a clenup interval that you can configure. – Matthias J. Sax Mar 02 '17 at 18:31
  • actually, I have stopped consumer group for 12 hours. and start the producer to send another set of messageS, then start the consumer group again. Surprisingly, the consumer is still able to consumer the messages. I was expecting the consumer only able to get messages which are sent after consumer gruop started as the last commited offset has been expired and removed after 12 hours. @MatthiasJ.Sax – Joey Trang Mar 03 '17 at 02:18
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/137100/discussion-between-van-and-matthias-j-sax). – Joey Trang Mar 03 '17 at 03:23

3 Answers3

10

The retention value can be configured in kafka broker using:

offsets.retention.minutes

The default is 24 hours. See: Kafka consumer group offset retention

Edit: As of Kafka 2.0, the default value is 7 days.

Mansoor Siddiqui
  • 20,853
  • 10
  • 48
  • 67
Ashok Kumar Sahoo
  • 580
  • 3
  • 8
  • 24
  • Thanks for this answer. Can you edit your answer to explain a bit more what is happening for posterity? – Gray Apr 01 '19 at 21:06
  • The config name should be `offsets.retention.minutes` (note the plural on offsets) as per https://kafka.apache.org/documentation/#brokerconfigs – Dylan Hogg Jun 06 '19 at 01:12
  • as stated by other commentor, the default value is no longer 24 hours, it seems to be 7 days. Or 10080 minutes, according to the https://kafka.apache.org/documentation/#brokerconfigs – Nap Jul 29 '19 at 05:58
3

we were also facing the same issue and I have researched a bit about it. I have the changed below steps it resolved .

We have retention of 14 days so we changed the kafka offsets topic retention to 14 days

We have changed cleanup.policy from compact to delete by running below

./kafka-configs.sh  --alter --zookeeper localhost:2181 --entity-type topics
              --entity-name __consumer_offsets --add-config cleanup.policy=delete

Updated config for topic: "__consumer_offsets".

Gray
  • 115,027
  • 24
  • 293
  • 354
  • 2
    __consumer_offsets is a special topic and you should not change cleanup policy of that topic – Bartosz Wardziński Dec 28 '18 at 10:35
  • Setting the cleanup policy to delete for __consumer_offsets is a bad idea. It normally uses the compact policy, which only keeps the last commit for a group:topic:partition combination. – ChrisBlom Oct 02 '19 at 13:50
1

The default retention period for the message(offset) in kafka is a week i.e)7 days

After the retention period all the existing offsets will be deleted from the broker due to avoid overflow of the space.

so the previous committed offset will be changed to latest offset as the previous offset has been removed by kafka / zookeeper.

If you want to have your message for long period, Specify --retention-period value while creating topic in kafka.

Mani Maran
  • 11
  • 1