0

We have the Kafka Consumer (concurrency of 5) with Manual ack. With the below implementation, sometimes getting the exception Commit cannot be completed since the group has already rebalanced ...

In the Exception scenario, the message is not acknowledged and it is getting consumed once again.

Any suggestions on the configuration changes with out impacting much on the performance of consumer???

Consumer Factory

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

/*
 * Reading of the variables from yml file
 */

@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // SASL and JAAS properties
        if(null!=kafkaTrustStoreFileLoc && !kafkaTrustStoreFileLoc.isEmpty() && isNotNullSslParams()) {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            props.put(SaslConfigs.SASL_MECHANISM, kafkaSaslMechanism);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaTrustStoreFileLoc);
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSslIdentifyAlg); 

            String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; 
            String jaasCfg = String.format(jaasTemplate, kafkaUsername, kafkaPassword);
            props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
        }

        return new DefaultKafkaConsumerFactory<>(props);
    }

    protected boolean isNotNullSslParams() {
        return null!=kafkaSecurityProtocol 
                && null!= kafkaSaslMechanism
                && null!= kafkaSslIdentifyAlg
                && null!= kafkaUsername
                && null!= kafkaPassword;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(5);
        return factory;
    }
}

Consumer

@KafkaListener(topics = {"${kafka.topic}" }, containerFactory = "kafkaListenerContainerFactory")
    public void listen(@Payload final String message,
            @Header(KafkaHeaders.RECEIVED_TOPIC) final String topic, Acknowledgment ack) {
        try {
            log.debug("Received '{}'-message {} from Kafka", topic, message);
                messageReceived(topic, message); //processing message       
                ack.acknowledge(); //ack the message
        } catch (Exception e) {
            log.error("Kafka Listener Exception : {} -> {}", e.getMessage(), e);
        }
    }

2 Answers2

0

You are taking too long to process all the records received from the last poll().

The processing of all records from each poll must be complete within the max.poll.interval.ms (ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) - default 5 minutes.

Figure out how long it takes to process each record and either increase the max.poll.interval.ms or reduce max.poll.records.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
0

You can try below parameters explained here

session.timeout.ms(default: 6 second) During each poll Consumer coordinator send heartbeat to broker to ensure that consumer's session live and active. If broker didn't receive any heartbeat till session.timeout.ms broker then broker leave that consumer and do rebalance

Note: If you increase session.timeout.ms please see if required to adjust broker group.max.session.timeout.ms setting.

max.poll.interval.ms :(Default: 5 minute) The maximum delay between invocations of poll() when using consumer group management. That means consumer maximum time will be idle before fetching more records.If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance

max.poll.records : (Default: 500) The maximum number of records returned in a single call to poll(). You can try to reduce to process less records at one time

If you still facing issue along with above property, beside "subscribe" try to use "assign" partition in your consumer.

Below are few consideration before setting value:

  1. group.max.session.timeout.ms> session.timeout.ms>group.min.session.timeout.ms
  2. request.timeout.ms > session.timeout.ms
  3. heartbeat.interval.ms ~ session.timeout.ms)/3 (approx)
Nitin
  • 3,533
  • 2
  • 26
  • 36