I have set up kafka consumer with below properties
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers",server);
consumerProperties.put("group.id",groupId);
consumerProperties.put("security.protocol", "SASL_PLAINTEXT");
consumerProperties.put("sasl.mechanism", "PLAIN");
consumerProperties.put("enable.auto.commit", "false");
consumerProperties.put("acks", "all");
consumerProperties.put("request.timeout.ms", 12000);
consumerProperties.put("max.block.ms",500);
consumerProperties.put("session.timeout.ms", 11000);
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//Object creation from above properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
// i have try catch blocks but exceptions aren't being thrown.
It goes in an infinte wait while polling if the server details aren't correct - like the server name is wrong or port is incorrect
try{
LOGGER.info("Subscribing to topic.");
consumer.subscribe(Arrays.asList(topic));
LOGGER.info("Subscribed to topic successfully.");
LOGGER.info("Start of polling records for consumer. ");
***records = consumer.poll(100);***
//CODE GETS STUCK IN ABOVE LINE FOR INFINITE TIME AND DOESNT COMES OUT
LOGGER.info("Returning records to microservice.");
}
catch(InterruptException interruptException) {
LOGGER.error("interrupt exception "+interruptException);
}
catch(TimeoutException timeoutException) {
LOGGER.error("Time out exception "+timeoutException);
}
catch (KafkaException kafkaException) {
LOGGER.error("Kafka Exception occurred while consuming records by consumer. Message: "+kafkaException.getMessage());
}
catch(Exception exception){
LOGGER.error("Exception occured while creating consumer object "+exception);
}
please suggest what changes i need to make to interrupt the infinite wait on poll for incorrect servers?