The Problem
We use a StatefulSet to deploy a Scala Kafka Streams application on Kubernetes. The instances have separate applicationId
s, so they each replicate the complete input topic for fault-tolerance. They are essentially read-only services that only read in a state topic and write it to the state store, from where customer requests are served via REST. That means, the consumer group always consist of only a single Kafka Streams instance at any given time.
Our problem is now that, when triggering a rolling restart, each instance takes about 5 minutes to start up, where most of the time is spent waiting in the REBALANCING
state. I've read here that Kafka Streams does not send a LeaveGroup
request in order to come back fast after a container restart, without rebalancing. How come this does not work for us and why does the rebalancing take so long, even though the applicationId
is identical? Ideally, to minimize downtime, the application should take over immediately from where it left when it was restarted.
Config
Here are some configs we changed from the default values:
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), "300000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// RocksDB config, see https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, classOf[BoundedMemoryRocksDBConfig])
Questions / Related configs
- Would it help to decrease
session.timeout.ms
? We set it to quite a large value as the Kafka brokers live in a different data center and network connections are at times not super reliable. - This answer suggests to decrease
max.poll.interval.ms
, as it is tied to a rebalance timeout. Is that correct? I'm hesitant to change this, as it might have consequences on the normal operation mode of our app. - There is mention of a config
group.initial.rebalance.delay.ms
to delay rebalancing during a deployment - but that would cause delays also after recovery from a crash, wouldn't it? - I also stumbled upon KIP-345, which targets to eliminate consumer rebalancing for static memberships entirely via
group.instance.id
, which would be a good fit for our user case, but it does not seem to be available yet on our brokers.
I'm confused by the multitude of configs and how to use them to enable fast recovery after an update. Can someone explain how they play together?