I am trying to raise an Alert when the Brokers are unavailable. I am using @KafkaListner on the consumer side to listen to the subscribed topic. when brokers are unavailable I can see WARN message on console window [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-policy-Extopic-1, groupId=group-Name] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. The broker may not be available.
I want to catch the exception within code so that i can raise Alert. This is my source code for ref:
Service class for Consumer:
@Service
public class Consumer{
@KafkaListener(topics = "topic",groupId = "groupName")
public void consumer(String message){
logger.info(message);
}
@EventListener()
public void eventHandler(NonResponsiveConsumerEvent event)
{
System.out.println("CAUGHT the event "+ event);
}
}
KafkaConfiguration class:
@EnableKafka
@Configuration
public class KafkaConfig {
@Autowired
Consumer consumer;
@Bean
public ConsumerFactory<Integer, String> createConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserilizer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserilizer);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(createConsumerFactory()); factory.getContainerProperties().setIdleEventInterval(5L)` factory.getContainerProperties().setNoPollThreshold(1L); factory.getContainerProperties().setMonitorInterval(1);
return factory;
}
@Bean
public Consumer consumer() {
return new Consumer();
} }
Thank you!!