3

I have created a bean for the topic array and at runtime i added some topics to this topic array but the consumer did not update the topic and still consuming from the first topics inside the topic array. i would like that the consumer added these new topic and start consuming from them

@Autowired
private String[] topicArray;

@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
    public void listen(...) {
        ...
    }
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Ahmed-Matteli
  • 33
  • 1
  • 3

1 Answers1

2

No; the property is evaluated once, during initialization.

You cannot add topics to an existing listener container at runtime.

You can, however, make your listener bean a prototype bean and create a new container each time you want to listen to new topics.

Here's an example:

@SpringBootApplication
public class So68744775Application {

    public static void main(String[] args) {
        SpringApplication.run(So68744775Application.class, args);
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    Foo foo(String id, String[] topics) {
        return new Foo(id, topics);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean(Foo.class, "one", new String[] { "topic1", "topic2" });
            context.getBean(Foo.class, "two", new String[] { "topic3" });
        };
    }

}

class Foo {

    private final String id;

    private final String[] topics;

    public Foo(String id, String[] topics) {
        this.id = id;
        this.topics = topics;
    }

    public String getId() {
        return this.id;
    }

    public String[] getTopics() {
        return this.topics;
    }

    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topics}")
    public void listen(String in) {
        System.out.println(in);
    }

}

Note that it is better, however, to omit the groupId so each container is in its own group (the id property). This avoids an unnecessary rebalance when the new container is added.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • omitting the groupId on container would mean that each message is consumed by each listener created, i.e. they don't collaborate as a group to consume the message, is that correct? – dh1 Jul 11 '23 at 05:45
  • 1
    This question was about adding topics. The comment only applies if the topic is different. If you just want to add another consumer to increase the concurrency on the same topic then the group id must be the same. – Gary Russell Jul 11 '23 at 11:18