3

I'm using the annotation @KafkaListener to consume topics on my application. My issue is that if I create a new topic in kafka but my consumer is already running, it seems the consumer will not pick up the new topic, even if it matches with the topicPattern I'm using. Is there a way to "refresh" the subscribed topics periodically, so that new topics are picked up and rebalanced upon my running consumers?

I'm using Spring Kafka 1.2.2 with Kafka 0.10.2.0.

Regards

3 Answers3

4

You can't dynamically add topics at runtime; you have to stop/start the container to start listening to new topics.

You can @Autowire the KafkaListenerEndpointRegistry and stop/start listeners by id.

You can also stop/start all listeners by calling stop()/start() on the registry itself.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Awesome! It works for topics! Does the same logic apply to new partitions on a topic or Spring Kafka catches those in runtime? – Bruno René Santos Jul 21 '17 at 09:16
  • Yes; it will work; all topic assignment is done by the broker; the container just subscribes and gets whatever the broker assigns to it. – Gary Russell Jul 21 '17 at 11:20
3

Actually it is possible.

It worked for me with Kafka 1.1.1.

Under the hood Spring uses consumer.subscribe(topicPattern) and now it is totally depends on Kafka lib whether the message will be seen by consumer.

There is consumer config property called metadata.max.age.ms which is 5 mins by default. It basically controls how often client will go to broker for the updates, meaning new topics will not be seen by consumer for up to 5 minutes. You can decrease this value (e.g. 20 seconds) and should see KafkaListener started to pick messages from new topics quicker.

  • Using Kafka 1.1.0 also worked. I applied it to my Kafka config file like this: props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 5000); which set the refresh time to 5seconds. thanks! – carl saptarshi Dec 13 '18 at 13:55
1

The following way works well for me.

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
containerProps.setMessageListener(new MessageListener<Integer, String>() {

    @Override
    public void onMessage(ConsumerRecord<Integer, String> message) {
        logger.info("received: " + message);
    }

});
container.setBeanName("testAuto");
container.start();

ref: http://docs.spring.io/spring-kafka/docs/1.0.0.RC1/reference/htmlsingle/

In practical application, I use a ConcurrentMessageListenerContainer instead of single-threaded KafkaMessageListenerContainer.

DanielJyc
  • 743
  • 1
  • 5
  • 12