272

I am relatively new to Kafka. I have done a bit of experimenting with it, but a few things are unclear to me regarding consumer offset. From what I have understood so far, when a consumer starts, the offset it will start reading from is determined by the configuration setting auto.offset.reset (correct me if I am wrong).

Now say for example that there are 10 messages (offsets 0 to 9) in the topic, and a consumer happened to consume 5 of them before it went down (or before I killed the consumer). Then say I restart that consumer process. My questions are:

  1. If the auto.offset.reset is set to earliest, is it always going to start consuming from offset 0?

  2. If the auto.offset.reset is set to latest, is it going to start consuming from offset 5?

  3. Is the behavior regarding this kind of scenario always deterministic?

Please don't hesitate to comment if anything in my question is unclear.

Sнаđошƒаӽ
  • 16,753
  • 12
  • 73
  • 90
Asif Iqbal
  • 4,562
  • 5
  • 27
  • 31

3 Answers3

397

It is a bit more complex than you described.
The auto.offset.reset config kicks in ONLY if your consumer group does not have a valid offset committed somewhere (2 supported offset storages now are Kafka and Zookeeper), and it also depends on what sort of consumer you use.

If you use a high-level java consumer then imagine following scenarios:

  1. You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won't even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage (Kafka or ZK as I mentioned).

  2. You have messages in a topic (like you described) and you start a consumer in a new consumer group group2. There is no offset stored anywhere and this time the auto.offset.reset config will decide whether to start from the beginning of the topic (earliest) or from the end of the topic (latest)

One more thing that affects what offset value will correspond to earliest and latest configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 5 messages, and then an hour later you post 5 more messages. The latest offset will still remain the same as in previous example but the earliest one won't be able to be 0 because Kafka will already remove these messages and thus the earliest available offset will be 5.

Everything mentioned above is not related to SimpleConsumer and every time you run it, it will decide where to start from using the auto.offset.reset config.

If you use Kafka version older than 0.9, you have to replace earliest, latest with smallest,largest.

Ruben Bartelink
  • 59,778
  • 26
  • 187
  • 249
serejja
  • 22,901
  • 6
  • 64
  • 72
  • 9
    Thanks very much for the answer. So as for the high level consumer, once a consumer has something committed (either in ZK or Kafka), the `auto.offset.reset` doesn't have any significance thereafter? The only significance of that setting is when there is nothing committed (and ideally that would be at the first start-up of the consumer) ? – Asif Iqbal Sep 04 '15 at 15:48
  • 4
    Exactly as you described – serejja Sep 05 '15 at 07:09
  • 1
    @serejja Hello - how about if I always have 1 consumer-per-group, and the scenarion#1 of your answer occurs for me? Would it be the same? – ha9u63a7 Jul 05 '17 at 21:55
  • 2
    @ha9u63ar didn't quite understand your question. If you restart your consumer in the same group then yes, it won't use `auto.offset.reset` and continue from the committed offset. If you always use different consumer group (like generate it when starting the consumer), then the consumer will always respect `auto.offset.reset` – serejja Jul 06 '17 at 12:59
  • @serejja yes and that's not working for me. could you please take a look at [this](https://stackoverflow.com/questions/44936513/kafka-effect-of-auto-offset-reset-when-same-consumer-group-is-subscribing?noredirect=1#comment76854763_44936513) - this is my issue – ha9u63a7 Jul 06 '17 at 13:24
  • 1
    _'The auto.offset.reset config kicks in ONLY if your consumer group does not have a valid offset'..._ That statement is half-right, the wrong bit being _'ONLY'_. The offset reset can also kick in if the consumer is not using a consumer group, but has called the `assign()` method the consumer and has specified an invalid offset. – Emil Koutanov Oct 02 '19 at 22:59
  • 1
    Note that the offset values are `earliest` and `latest` https://docs.confluent.io/current/clients/consumer.html#id1 – pascalwhoop Feb 05 '20 at 13:15
  • what about auto.offset.reset=beginning. what's the difference to earliest ? – ugurkocak1980 Dec 23 '21 at 11:15
  • In the scenario #2, what if I don't specify the "auto.offset.reset"? What offset will the consumer start reading at? – emeraldhieu Apr 27 '23 at 17:22
107

Just an update: From Kafka 0.9 and forth, Kafka is using a new Java version of the consumer and the auto.offset.reset parameter names have changed; From the manual:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

I spent some time to find this after checking the accepted answer, so I thought it might be useful for the community to post it.

Community
  • 1
  • 1
Israel Zinc
  • 2,713
  • 2
  • 18
  • 30
  • 2
    The accepted answer is written in terms of the new names - this answer provides nothing unique, does it? (If it didn't have 90 upvotes at time of writing, I'd suggest deleting it ;) ) – Ruben Bartelink Jan 14 '21 at 13:09
  • 1
    Surprisingly a lot of people found it useful. – Israel Zinc Jan 14 '21 at 14:38
  • I agree an answer doesn't get that many upvotes completely by accident. But the point regarding the original answer no longer stands AFAICT so I can't think of a reason why I'd upvote it now? (I had also seen that specific bit of the manual before landing here too). Aside: [this answer](https://stackoverflow.com/a/58829964/11635) is also quite useful in this space – Ruben Bartelink Jan 14 '21 at 19:24
21

Further more there's offsets.retention.minutes. If time since last commit is > offsets.retention.minutes, then auto.offset.reset also kicks in

Sasa Ninkovic
  • 355
  • 3
  • 7
  • 5
    doesnt this seem redundant with log retention? should ofset retention be based on log retention? – mike01010 Feb 01 '18 at 02:53
  • @mike01010 that's right. It should be based on log retention, that's one of the proposed solutions in the ticket. `Prolong default value of offsets.retention.minutes to be at least twice larger than log.retention.hours.` https://issues.apache.org/jira/browse/KAFKA-3806 – saheb Mar 26 '18 at 14:35
  • 5
    That answer scared me for a while, until i check [the documentation](http://kafka.apache.org/documentation.html#brokerconfigs) of `offsets.retention.minutes`: After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period. (This is for `Kafka 2.3`) – jumping_monkey Oct 21 '19 at 07:48
  • It is reasonable to remove the offset after no consumer is in a group for 7 days (the default value). If Kafka doesn't do that, we face orphaned offsets. – Mostafa Lavaei Aug 20 '22 at 20:29