0

I'm trying to use the idle between polls mentioned here to slow down the consumption rate, i also use the max.poll.interval.ms to double the idle between polls, but its always triggering partition rebalance, any idea what is the problem? [Edit] I have 5 hosts and i'm setting concurrency level to 1 [Edit 2] I was setting the idle between polls to 5 min and max.poll.interval.ms to 10 min i also noticed this log "About to close the idle connection from 105 due to being idle for 540012 millis". I decreased the idle between polls to 10 sec and the issue disappeared, any idea why?

 private ConsumerFactory<String, GenericRecord> dlqConsumerFactory() {
        Map<String, Object> configurationProperties = commonConfigs();

        DlqConfiguration dlqConfiguration = kafkaProperties.getConsumer().getDlq();

        final Integer idleBetweenPollInterval = dlqConfiguration.getIdleBetweenPollInterval()
                .orElse(DLQ_POLL_INTERVAL);

        final Integer maxPollInterval = idleBetweenPollInterval * 2; // two times the idleBetweenPoll, to prevent re-balancing
        logger.info("Setting max poll interval to {} for DLQ", maxPollInterval);

        overrideIfRequired(DQL_CONSUMER_CONFIGURATION, configurationProperties, ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);

        dlqConfiguration.getMaxPollRecords().ifPresent(maxPollRecords ->
                overrideIfRequired(DQL_CONSUMER_CONFIGURATION, configurationProperties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
        );


        return new DefaultKafkaConsumerFactory<>(configurationProperties);
    }
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
user1928473
  • 31
  • 1
  • 3
  • 10

2 Answers2

2

<time to process last polled records> + <idle between polls> must be less than max.poll.interval.ms.

EDIT

There is logic in the container to make sure we never exceed the max poll interval:

idleBetweenPolls = Math.min(idleBetweenPolls,
        this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll)
                - 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance

I can't reproduce the issue with this...

@SpringBootApplication
public class So63411124Application {

    public static void main(String[] args) {
        SpringApplication.run(So63411124Application.class, args);
    }

    @KafkaListener(id = "so63411124", topics = "so63411124")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
                    KafkaTemplate<String, String> template) {

        factory.getContainerProperties().setIdleBetweenPolls(300000L);
        return args -> {
            while (true) {
                template.send("so63411124", "foo");
                Thread.sleep(295000);
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63411124").partitions(1).replicas(1).build();
    }

}
logging.level.org.springframework.kafka=debug
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=600000

If you can provide a small example like this that exhibits the behavior you describe, I will take a look to see what's wrong.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • thats what we have max.poll.interval.ms = idle between polls*2, the record takes millisecods to get processed, we also set the max.poll.records to 10 – user1928473 Aug 14 '20 at 13:11
  • Also as i said in edit 2, when i decreased the idle between polls to 10 sec, the rebalance stopped – user1928473 Aug 14 '20 at 13:15
  • Hmmm - something else must be going on - there is logic in the container that ensures that the idle between polls is never greater than `max.poll.interval.ms - timeSinceLastPoll - 5000` (5 seconds less to avoid a race). I will see if I can reproduce your condition. – Gary Russell Aug 14 '20 at 14:19
  • I edited the question with the code snippet i'm using. Also the problem is not visible on testing env. i think it might be because on testing i have only 1 host, but on prod. i've 5 hosts – user1928473 Aug 14 '20 at 16:39
  • Are you sure the rebalance is caused by this? I don't see how it can happen. As I said, even if the interval is incorrectly set at a too-large value, we adjust it down to ensure we don't exceed the `max.poll.interval.ms`. – Gary Russell Aug 14 '20 at 16:59
  • i noticed this log "About to close the idle connection from 105 due to being idle for 540012 millis" before the rebalance, after searching i found this property connections.max.idle.ms is set to 9 min, but i dont know why the connection was idle for that time – user1928473 Aug 14 '20 at 17:06
  • Looking at the `kafka-clients`, it looks like that is controlled by `connections.max.idle.ms`. There's a comment in `ConsumerConfig` `/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */` - it is set to 9 minutes by default so with an idlebetween of only 5 mins, that shouldn't kick in. – Gary Russell Aug 14 '20 at 17:19
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/219828/discussion-between-user1928473-and-gary-russell). – user1928473 Aug 14 '20 at 17:34
  • @GaryRussell I've got the same continuous rebalancing problem due to idleBetweenPolls param with 4 pods in OpenShift. Have you found the possible cause of this problem? – VAG Dec 18 '21 at 15:11
  • Ask a new question showing code, configuration and logs. – Gary Russell Dec 18 '21 at 15:15
  • @GaryRussell asked new question: https://stackoverflow.com/questions/70404868/spring-for-apache-kafka-idlebetweenpolls-param-leads-to-continuous-rebalance – VAG Dec 18 '21 at 16:07
0

Though Gary Russel's answer is correct, I want to elaborate a bit on this answer.

You see, max.poll.interval.ms represents the max amount of time that can pass between polls of the consumer. If the particular consumer A in this specific consumer group B is configured to poll records from the broker each 15s for instance, and setting max.poll.interval.ms is configured to 12s on the client, then, once 12s will pass, the broker will decide that the consumer A is dead, because the consumer A was not yet managed to poll, and the broker will remove this consumer A from the consumer group, which will, of course, trigger the partition rebalance.

So that is why

time to process last polled records + idle between polls must be less than max.poll.interval.ms.

is a valid statement, since time to process last polled records + idle between polls equals the pull period, and in plain English it just means - you should poll before timeout

As a side note, max.poll.interval.ms is not the only thing that is used to indicate consumer failures. There is also session.timeout.ms setting, used in conjunction with heartbeat.interval.ms. Just beware that misconfiguring these 2 settings can also lead to continuous rebalance of the partitions. That's out of the topic, I will not explain it here, but FYI there is a very good answer on SO about how all these settings live together, I highly suggest you take a look at it.

Have a nice day)

Mikhail2048
  • 1,715
  • 1
  • 9
  • 26