2

This is a follow up on a previous question I sent regarding high latency in our Kafka Streams; (Kafka Streams rebalancing latency spikes on high throughput kafka-streams services).

As a quick reminder, our Stateless service has very tight latency requirements and we are facing too high latency problems (some messages consumed more than 10 secs after being produced) specially when a consumer leaves gracefully the group.

After further investigation we have found out that at least for small consumer groups the rebalance is taking less than 500ms. So we thought, where is this huge latency when removing one consumer (>10s) coming from?

We realized that it is the time between the consumer exiting Gracefully and the rebalance kicking in.

That previous tests were executed with all-default configurations in both Kafka and Kafka Streams application. We changed the configurations to:

properties.put("max.poll.records", 50); // defaults to 1000 in kafkastreams
properties.put("auto.offset.reset", "latest"); // defaults to latest
properties.put("heartbeat.interval.ms", 1000);
properties.put("session.timeout.ms", 6000);
properties.put("group.initial.rebalance.delay.ms", 0);
properties.put("max.poll.interval.ms", 6000);

And the result is that the time for the rebalance to start dropped to a bit more than 5 secs.

We also tested to kill a consumer non-gracefully by 'kill -9' it; the result is that the time to trigger the rebalance is exactly the same.

So we have some questions: - We expected that when the consumer is stopping gracefully the rebalance is triggered right away, should that be the expected behavior? why isn't it happening in our tests? - How can we reduce the time between a consumer gracefully exiting and the rebalance being triggered? what are the tradeoffs? more unneeded rebalances?

For more context, our Kafka version is 1.1.0, after looking at libs found for example kafka/kafka_2.11-1.1.0-cp1.jar, we installed Confluent platform 4.1.0. On the consumer side, we are using Kafka-streams 2.1.0.

Thank you!

jarias
  • 166
  • 7

1 Answers1

2

Kafka Streams does not sent a "leave group request" when an instance is shut down gracefully -- this is on purpose. The goal is to avoid expensive rebalances if an instance is bounced (eg, if one upgrades an application; or if one runs in a Kubernetes environment and a POD is restarted quickly automatically).

To achieve this, a non public configuration is used. You can overwrite the config via

props.put("internal.leave.group.on.close", true); // Streams' default is `false`
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thank you Matthias! are you aware why is that a non-public config? is using that non-public config something we could rely on? – jarias Feb 04 '19 at 10:58
  • There are concerns making it public. Cf. https://issues.apache.org/jira/browse/KAFKA-6995 -- If you use it, you should double check before upgrading if the config is still there and if not, what the impact might be -- because it's non-public, it won't be mentioned in release notes obviously. – Matthias J. Sax Feb 04 '19 at 22:26
  • So, combined with this and incremental cooperative rebalancing, it would avoid rebalancing the assigned partitions (including heavy state store backing partitions) on the bounced pod? The reason why I ask is because I have seen some rebalance occurring in 2.7.0 KStream apps with heavy state stores. Also, I do wonder if it’s because those were standby replicas moving around… – Jin May 23 '21 at 13:28
  • Yes, I can reduce the risk, but it also depends on your configs. You might also want to checkout static group membership though, that should help even further. – Matthias J. Sax May 24 '21 at 00:37