1

Problem description:

Our Kafka consumer (developed in Spring Boot 2.x) are executing along several days. When we restart those consumer all messages of the topic are consumed again, but only under especific conditions.

Conditions:

We supose that the combination broker/topic config (log.retention.*, offsets.retention.*) and consumer config (auto.offset.reset = earliest) are causing this behavior.
Obviously we can't set consumer to "latest", because if the consumer is stopped and new messages arrives, when the consumer start again, those messages won't be consumed.

Question:

What is the correct setup to avoid this situation?
In last Kafka Broker release (2.x) the default values for log.retention.* and offsets.retention.* are the same (https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days)

Could this new configuration setup solve the problem?

Consumer configuration (auto.commit delegated on Spring Cloud Stream Framework):

           auto.commit.interval.ms = 100
           auto.offset.reset = earliest
           bootstrap.servers = [server1:9092]
           check.crcs = true
           client.id = 
           connections.max.idle.ms = 540000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = consumer_group1
           heartbeat.interval.ms = 3000
           interceptor.classes = null
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 305000
           retry.backoff.ms = 100
           value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Brokers configuration:

           log.retention.ms = 86400000
           log.retention.minutes = 10080
           log.retention.hours = 168
           log.retention.bytes = -1

           offsets.retention.ms = 864000000
           offsets.retention.minutes = 14400
           offsets.retention.hours = 240 

           unclean.leader.election.enable = false
           log.cleaner.enable = true
           auto.leader.rebalance.enable = true
           leader.imbalance.check.interval.seconds = 300
           log.retention.check.interval.ms = 300000
           log.cleaner.delete.retention.ms = 604800000

Thanks and regards

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Fonexn
  • 189
  • 4
  • 15
  • you are right, it's due to different values for `log.retention.*` and `offsets.retention.*`. please take a look at https://stackoverflow.com/questions/50741783/why-does-kafka-streams-reprocess-the-messages-produced-after-broker-restart/52571074#52571074 – Vasyl Sarzhynskyi Dec 19 '18 at 12:35

2 Answers2

4

You are right, you experiencing this issue due to different values for log.retention.* and offsets.retention.* (7 days and 1 day respectively) for Kafka versions prior to 2.0, please check description here. it's due to rare messages coming into your topic, and offset data already expired.

it's not totally correct regarding your phrase Obviously we can't set consumer to "latest". if you received last messages less than 1 day before (like few hours before), you could safely update auto.offset.reset value to latest, and with the same group id (or application.id). in such case you will not lose messages.

As another option, you could change log retention value for a specific topic to 1 day. Also you could update value offsets.retention.*, but with that you need to test it from a performance point of you, it might be degraded.

Vasyl Sarzhynskyi
  • 3,689
  • 2
  • 22
  • 55
  • Ok, so if I understand well, setting `offsets.retention.minutes = 10080` and `log.retention.minutes = 10080` for example (should be the same), with `auto.offset.reset` value to `earliest` when new topics will be created this problem should be solved right? This values applied for whole topic or for each partition? – Fonexn Dec 19 '18 at 21:34
  • 1
    configs `offsets.retention.minutes` and `log.retention.minutes` applied for all Kafka topics in your cluster. you could override log retention for a specific topic (it will be applied for each partition in this topic) by specifying `retention.ms`. setting `offsets.retention.minutes` and `log.retention.minutes` with the same value in major cases will solve your problem, but still, it will be a really small probability that you faced with it, due to messages in Kafka will not be removed immediately after expiring `log.retention.minutes` time. – Vasyl Sarzhynskyi Dec 19 '18 at 22:01
  • Perfect, but I have very sensitive data and I should never permit reprocessing it from beggining. Can it be guaranteed by Kafka broker/consumer configuration that this never happens or in any config combination exist a small possibility of reprocessing all data with `earliest` and with `latest` lost some messages? Any other idea to achieve it? Thanks! – Fonexn Dec 19 '18 at 23:20
  • Where exactly can I change this configuration of "log.retention.hours"? I am using Spring Boot to consumer a topic in Kafka and that Kafka is running via docker-compose. Should I change anything in Zookeeper configuration on docker-compose? What can I change and where? I need all messages go away immediately after consumed and acked (manually). – Francisco Souza Dec 03 '20 at 19:20
  • @FranciscoSouza Messages cannot be immediately removed after consumed. If you need this, then Kafka is not the correct tool... To answer your first question, you cannot set this in Spring. That is a Kafka server variables and can be set in env-vars (depending on the image used), for example `KAFKA_LOG_RETENTION_HOURS` on the Confluent images. – OneCricketeer Sep 07 '22 at 22:34
0

If you keep your application running 24x7 (e.g. over the weekend when there is no data), one option would be to set an idleInterval and add an ApplicationListener (or @EventListener) to listen for ListenerContainerIdleEvents.

Then, if the idleTime property is approaching your log retention, you can re-commit the offsets using the Consumer in the event - get the assigned partitions, find their current position() and then re-commit.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179