0

I am trying to implement kafka non blocking retry on consumer subscribed to multiple topics by topicPattern. I am getting this error when I try to create a retry RetryTopicConfiguration.

No topics were provided for RetryTopicConfiguration for method dynamicConsumer in class KafkaConsumerClass

Here dynamicConsumer is the function annotated with topicPattern.

Sample config

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName() );

Retry config

 .newInstance()
 .listenerFactory(kafkaListenerContainerFactoryRetry)
 .exponentialBackoff(1000, 2, 5000)
 .maxAttempts(3)
 .includeTopics(Arrays.asList("postfixtest"))
 .create(template);

Kafka consumer

@KafkaListener( topicPattern = ".*postfixtest",groupId = "edsdefd",containerFactory = "kafkaListenerContainerFactory4")
public void dynamicConsumer(ConsumerRecord<String,String> message) throws Exception {
    System.out.println("got a message from  topic --> "+message.topic()+" - message is ->  "+message.value()+" partition" +message.partition());
}

Other assumptions: Retry bean will not consume from a topic unless the parent topic has its consumer in the application context.

  • Please show the code for how you are configuring `RetryTopicConfiguration` and how you are using it. – JCompetence Dec 06 '21 at 13:33
  • I am using the above retry config only,I have added my preferred topic name in the builder.The retry config will automaticaly create retry and dlt topics for me. – siddharthabhi30 Dec 06 '21 at 18:29
  • You need to explain what you mean by "dynamic consumer"; the retry topic mechanism relies on underlying Spring Framework functionality. Provide much more information. – Gary Russell Dec 06 '21 at 19:06
  • Dynamic consumer is just a consumer consuming with a topicPattern instead of a single topic. I should have written multiple topic consumer by regex instead, my bad. – siddharthabhi30 Dec 07 '21 at 03:47
  • @GaryRussell One more question. How would you suggest to make non blocking retry bean at runtime, do you think it will be scalable. Using topicPattern is scalable by increasing consumer. I don't think the non blocking retry topics that are made automatically are scalable , atleast for topics that are created at runtime. – siddharthabhi30 Dec 07 '21 at 04:12

1 Answers1

1

You cannot use non-blocking retry with topicPattern because the framework needs to know the topic name ahead of time so it can set up the infrastructure and retry topics.

You could use a prototype bean to create new listeners at runtime; it should work for non-blocking retry too.

See Can i add topics to my @kafkalistener at runtime

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • This will work for most of the cases, but I needed topic pattern like scalable solution, by creating a listener manually, I would have to manully distribute large number of topics created at runtime onto each server. Which won't be possible. – siddharthabhi30 Dec 07 '21 at 15:08
  • Currently I am making topics per client basis ,at runtime when client is onboarded, and I need to scale the servers as load increases. I don't think manually creating listener would work in my case, I would need to manually create non blocking retry pattern then. – siddharthabhi30 Dec 07 '21 at 15:28