We have the Kafka Consumer (concurrency of 5) with Manual ack. With the below implementation, sometimes getting the exception Commit cannot be completed since the group has already rebalanced ...
In the Exception scenario, the message is not acknowledged and it is getting consumed once again.
Any suggestions on the configuration changes with out impacting much on the performance of consumer???
Consumer Factory
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
/*
* Reading of the variables from yml file
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// SASL and JAAS properties
if(null!=kafkaTrustStoreFileLoc && !kafkaTrustStoreFileLoc.isEmpty() && isNotNullSslParams()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, kafkaSaslMechanism);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaTrustStoreFileLoc);
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSslIdentifyAlg);
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, kafkaUsername, kafkaPassword);
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
}
return new DefaultKafkaConsumerFactory<>(props);
}
protected boolean isNotNullSslParams() {
return null!=kafkaSecurityProtocol
&& null!= kafkaSaslMechanism
&& null!= kafkaSslIdentifyAlg
&& null!= kafkaUsername
&& null!= kafkaPassword;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(5);
return factory;
}
}
Consumer
@KafkaListener(topics = {"${kafka.topic}" }, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload final String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) final String topic, Acknowledgment ack) {
try {
log.debug("Received '{}'-message {} from Kafka", topic, message);
messageReceived(topic, message); //processing message
ack.acknowledge(); //ack the message
} catch (Exception e) {
log.error("Kafka Listener Exception : {} -> {}", e.getMessage(), e);
}
}