Problem description:
Our Kafka consumer (developed in Spring Boot 2.x) are executing along several days. When we restart those consumer all messages of the topic are consumed again, but only under especific conditions.
Conditions:
We supose that the combination broker/topic config (log.retention.*, offsets.retention.*) and consumer config (auto.offset.reset = earliest) are causing this behavior.
Obviously we can't set consumer to "latest", because if the consumer is stopped and new messages arrives, when the consumer start again, those messages won't be consumed.
Question:
What is the correct setup to avoid this situation?
In last Kafka Broker release (2.x) the default values for log.retention.* and offsets.retention.* are the same (https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days)
Could this new configuration setup solve the problem?
Consumer configuration (auto.commit delegated on Spring Cloud Stream Framework):
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [server1:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumer_group1
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Brokers configuration:
log.retention.ms = 86400000
log.retention.minutes = 10080
log.retention.hours = 168
log.retention.bytes = -1
offsets.retention.ms = 864000000
offsets.retention.minutes = 14400
offsets.retention.hours = 240
unclean.leader.election.enable = false
log.cleaner.enable = true
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.retention.check.interval.ms = 300000
log.cleaner.delete.retention.ms = 604800000
Thanks and regards