I am trying to implement kafka non blocking retry on consumer subscribed to multiple topics by topicPattern. I am getting this error when I try to create a retry RetryTopicConfiguration.
No topics were provided for RetryTopicConfiguration for method dynamicConsumer in class KafkaConsumerClass
Here dynamicConsumer is the function annotated with topicPattern.
Sample config
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName() );
Retry config
.newInstance()
.listenerFactory(kafkaListenerContainerFactoryRetry)
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(3)
.includeTopics(Arrays.asList("postfixtest"))
.create(template);
Kafka consumer
@KafkaListener( topicPattern = ".*postfixtest",groupId = "edsdefd",containerFactory = "kafkaListenerContainerFactory4")
public void dynamicConsumer(ConsumerRecord<String,String> message) throws Exception {
System.out.println("got a message from topic --> "+message.topic()+" - message is -> "+message.value()+" partition" +message.partition());
}
Other assumptions: Retry bean will not consume from a topic unless the parent topic has its consumer in the application context.