0

I have one kafka topic with some high number of partitions, say 100 (fixed), and I have a spring boot application which has kafka listeners(or say consumer) consuming from the topic. Now I know that using concurrency property you can adjust the number of consumer threads, something like this (controlling it via application.properties file):

    @KafkaListener(groupId = "${someGroupID}",
            topics = "${someTopic}",
            containerFactory = "someKafkaListenerContainerFactory",
            concurrency = "${concurrencyProperty}")
    {
            //some logic
    }

Using concurrency property on @KafkaListener I can fix the number of consumer threads but if I want to dynamically adjust that number during runtime based on the request load (say if all consumer threads are occupied, render 5 new consumer threads), is there anyway to do this?

Or let me know if there are any better practices to manage the consumer threads according to the load. Thanks.

NOTE: I have kept topic partition count as fixed to a high number since increasing that number could bring in consumer re-balancing delays which as far as I know can be very time consuming, so I am planning to keep the partition count as fixed.

dh1
  • 97
  • 1
  • 9

1 Answers1

1

You cannot change the concurrency dynamically while the container is running; you can, however, stop the container, change the concurrency and restart it.

To stop/start the container, use the KafkaListenerEndpointRegistry bean, and give the listener(s) an id property so you can get a reference to the specific container from the registry.

You should not stop a container on one of its listener threads.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Would this approach be better or maybe assigning the task to be executed to a dynamic ExecutorThreadPool? Restarting a container would have some overheads i think... – dh1 Jul 10 '23 at 14:35
  • 1
    No; it is not good practice to consume records asynchronously. Another alternative would be to add more containers dynamically. Search here; there are several ways to create containers dynamically. See this answer for some links https://stackoverflow.com/questions/71471941/spring-kafka-multiple-topic-for-one-class-dynamically/71483489#71483489 – Gary Russell Jul 10 '23 at 14:42
  • 1
    This might be the simplest approach - make the listener a prototype bean. https://stackoverflow.com/questions/68744775/can-i-add-topics-to-my-kafkalistener-at-runtime/68745230#68745230 – Gary Russell Jul 10 '23 at 14:45
  • My requirement was more like fire and forget, so thought async consumption would also be fine. But thanks for the links, surely will check those out! – dh1 Jul 10 '23 at 14:51