I've got a frontend where I trigger a change of my Kafka topic to an another one. When I do this the Java Springboot backend should also change listening to that new topic for consuming incoming messages. The problem is that this has to be happening during runtime. Therefore @KafkaListener
isn't an option because it needs the topic name at least at startup.
I'm passing the new topic as an UUID string to the method shown below. This was one of many attempts and it won't recognize any messages in the new uuid topic (even if there are messages). The new topics and the messages are produced by an another service (this part is working fine). I got this example from another question that didn't really helped me: Spring Kafka - Subscribe new topics during runtime And I also read: How to create separate Kafka listener for each topic dynamically in springboot?
Nevertheless during application startup and first call of the changeListener
method I get this logging line in the console:
INFO 9636 --- [main] o.a.k.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-group-1, groupId=group] Subscribed to topic(s): 09574388-e8e1-4cef-8e67-881f69850f8f
The goal is to call the method of the MessageListener with // do other stuff with message
every time there's a message in the new topic in Kafka.
Is there a possibility to change topics during runtime and if there is how?
If you need more information feel free to ask.
public void changeListener(String uuid) {
ContainerProperties containerProps = new ContainerProperties(uuid);
containerProps.setMessageListener(
(MessageListener<UUID, String>) message -> {
LOG.info("received: " + message);
// do other stuff with message
}
);
KafkaMessageListenerContainer<UUID, String> container =
new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory<>(consumerProps()), containerProps);
container.start();
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8069");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}