0

I have a kafka streams application (Kafka v1.1.0) with multiple(24) topics. Four of these topics are source topics and the remaining are destination topics. They seem to have reprocessed data on changing the system time to a previous date. I have the default broker configs i.e. :

auto.offset.reset         = latest
offsets.retention.minutes = 1440 #1 day
log.retention.hours       = 168  #7 days

I have looked into the following links in details and the sub-links posted in the answers:

1) Kafka Stream reprocessing old messages on rebalancing

2) How does an offset expire for an Apache Kafka consumer group?

3) https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days

The following JIRA discussion also states this issue:

https://issues.apache.org/jira/browse/KAFKA-3806

After reading up on this I have established an understanding of the cases in which stream consumers might reprocess data.

However, with the default configs mentioned above (the ones being used for my setup), if offsets are lost i.e. offsets.retention.minutes has expired then the consumer would rebalance and start from latest committed offset (which wouldn't be anything) and any new incoming data would be processed as is. In this scenario there shouldn't be any data reprocessing and hence no duplicates.

In the case of a system time change however, there might be a possibility of offsets being inconsistent i.e. it is possible for offsets of source topic to have a CommitTime of an earlier date after a CommitTime of a later date. In this case if a topic has a low traffic and there is no data received on it for more than offsets.retention.minutes then its offset would be no longer available and another topic with high traffic would have its offset in __consumer_offsets topic.

How would the stream consumer behave in this scenario? Is there a chance of duplication in this scenario. I am really confused about it. Any help will be really appreciated.

el323
  • 2,760
  • 10
  • 45
  • 80
  • 1
    Kafka Streams uses `auto.offset.reset="earliest"` by default (in contrast to the consumer that uses `"latest"` as default). Hence, if you application is down or rebalances, and offsets are lost, it would reprocess data by default. – Matthias J. Sax Jun 11 '19 at 21:17
  • Thanks for the reply. Can you please explain why Kafka Streams uses `auto.offset.reset="earliest"` by default? Secondly, can you please explain a bit about how kafka streams would behave in the scenario of system time being set to a previous date? Thanks. – el323 Jun 12 '19 at 04:37
  • Not 100% sure _why_ the default is different: maybe it's a better default considering that Kafka Streams supports KTable and for those you usually want to process all data to build up the table state (and not start with an empty state).---For the second part, I am actually not 100% sure. AFAIK, Kafka Streams itself uses SystemTime only to compute latency metrics etc, but processing itself should not be affected. (btw: did you change system time broker side or client side, or both) – Matthias J. Sax Jun 12 '19 at 07:11
  • I changed system time for both broker and client – el323 Jun 12 '19 at 09:53
  • Broker side change might mess with the configured retention time settings... Kafka Streams should not be affected by the client side change. Of course, the broker issue could affect Kafka Streams, in case it crashed and no committed offset is found. Kafka Streams would commit offsets regularly though and it there is no error new offsets should be committed getting you back into stable state. – Matthias J. Sax Jun 12 '19 at 17:21

0 Answers0