0

I've got a frontend where I trigger a change of my Kafka topic to an another one. When I do this the Java Springboot backend should also change listening to that new topic for consuming incoming messages. The problem is that this has to be happening during runtime. Therefore @KafkaListener isn't an option because it needs the topic name at least at startup.
I'm passing the new topic as an UUID string to the method shown below. This was one of many attempts and it won't recognize any messages in the new uuid topic (even if there are messages). The new topics and the messages are produced by an another service (this part is working fine). I got this example from another question that didn't really helped me: Spring Kafka - Subscribe new topics during runtime And I also read: How to create separate Kafka listener for each topic dynamically in springboot?
Nevertheless during application startup and first call of the changeListener method I get this logging line in the console:

INFO 9636 --- [main] o.a.k.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-group-1, groupId=group] Subscribed to topic(s): 09574388-e8e1-4cef-8e67-881f69850f8f

The goal is to call the method of the MessageListener with // do other stuff with message every time there's a message in the new topic in Kafka.
Is there a possibility to change topics during runtime and if there is how?
If you need more information feel free to ask.

  public void changeListener(String uuid) {
    ContainerProperties containerProps = new ContainerProperties(uuid);
    containerProps.setMessageListener(
        (MessageListener<UUID, String>) message -> {
          LOG.info("received: " + message);
          // do other stuff with message
        }
    );
    KafkaMessageListenerContainer<UUID, String> container =
        new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory<>(consumerProps()), containerProps);
    container.start();
  }

  private Map<String, Object> consumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8069");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
  }
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

2 Answers2

2

To my knowledge, going through the source code, you can't change the topic at runtime. So you'll want to stop the current container and recreate a new one.

I'd advise against using the registry in this case and manage the container yourself, because it seems like you can't remove containers from the registry and will end up with a memory leak.

You can autowire yourself KafkaListenerContainerFactory. This factory requires a an endpoint. I must admit, that it seemed a bit painful to me to setup the endpoint, if you just want to change the topic and have a callback called, because all available implementations use meta programming with bean and method references.

The following snipped should get you started, although it might need some more tweaking.

@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
    private KafkaListenerContainerFactory<?> factory;

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }


    @Autowired
    public void setFactory(KafkaListenerContainerFactory<?> factory) {
        this.factory = factory;
    }

    @EventListener(classes = {ApplicationStartedEvent.class})
    public void onStarted() throws InterruptedException, NoSuchMethodException {
        var listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_3"));
        registry.stop();
        listenerContainer.start();
        Thread.sleep(2000);
        listenerContainer.stop();

        listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_4"));
        listenerContainer.start();
        Thread.sleep(2000);
        listenerContainer.stop();
    }

    private KafkaListenerEndpoint getEndpoint(String topic) throws NoSuchMethodException {
        var listenerEndpoint = new MethodKafkaListenerEndpoint<String, String>();
        listenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        listenerEndpoint.setBean(this);
        listenerEndpoint.setMethod(getClass().getMethod("onMessage", String.class, String.class));
        listenerEndpoint.setTopics(topic);

        return listenerEndpoint;
    }

    public void onMessage(String key, String value) {
        System.out.println(key + ":" + value)
    }
}

As a side note, you can implement KafkaListenerConfigurer, if you want access to the registry, because it's not autowireable. But again, don't use it, if you want to kill your containers, because you can't remove the references, as far as I know.

Goatfryed
  • 118
  • 6
  • Got it running now with this kind of attempt. Creating a new ListenerEndpoint and using it for each new topic afterwards creating a new container within the facrory just works fine. If anyone needs an example of my code let me now and i could post details. – masterchi3f Jan 22 '21 at 09:24
0

Creating a container manually like that is a bad idea because it needs initializing by spring; you won't get full functionality.

If you are using spring boot; use its ConcurrentMessageListenerContainerFactory to create a container.

If you are not using boot, add your own ConcurrentMessageListenerContainerFactory @Bean.

It is also not a good idea to use the same group.id for multiple listener containers because a rebalance on one will cause an unnecessary rebalance on the others.

In order to read existing records in a new topic, you must set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="earliest:" (it defaults to latest).

Gary Russell
  • 166,535
  • 14
  • 146
  • 179