0

I am trying to raise an Alert when the Brokers are unavailable. I am using @KafkaListner on the consumer side to listen to the subscribed topic. when brokers are unavailable I can see WARN message on console window [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-policy-Extopic-1, groupId=group-Name] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. The broker may not be available.

I want to catch the exception within code so that i can raise Alert. This is my source code for ref:

Service class for Consumer:

@Service
public class Consumer{
@KafkaListener(topics = "topic",groupId = "groupName")
public void consumer(String message){
        logger.info(message);
    }

@EventListener()
public void eventHandler(NonResponsiveConsumerEvent event) 
   {
     System.out.println("CAUGHT the event "+ event);
   }
}

KafkaConfiguration class:

@EnableKafka
@Configuration
public class KafkaConfig {

@Autowired
Consumer consumer;
@Bean
public ConsumerFactory<Integer, String> createConsumerFactory() {
 Map<String, Object> props = new HashMap<>();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserilizer);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserilizer);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(createConsumerFactory());        factory.getContainerProperties().setIdleEventInterval(5L)`       factory.getContainerProperties().setNoPollThreshold(1L);        factory.getContainerProperties().setMonitorInterval(1);
    return factory;
}

@Bean
public Consumer consumer() {
    return new Consumer();
} }

Thank you!!

Deshan
  • 11
  • 5
  • Maybe a duplicate of https://stackoverflow.com/questions/64869177/spring-boot-kafka-health-indicator – Katy Sep 02 '22 at 12:06
  • This is a suggeston: you can expose kafak matrixes using promethues and grafan and then setup alert from there. Then you dont have to set alert in service vice. – Yahampath Sep 02 '22 at 20:11
  • @Yahampath thank u, will try to expose kafka metrics and raise an alert. – Deshan Sep 04 '22 at 06:15

0 Answers0