0

I was following this

Creating KafkaListener without Annotation & without Spring Boot

I have an API, so whenever the API is called a new container is created with a new message listener. The listener will fetch messages of certain offsets and then it will be paused. After that, I don't need it. But when the API is hit many times lots of objects will be created. I want to destroy the useless objects.

I was thinking of when a listener is idle for some time, then I can destroy it.

How can I do this?

Thanks in advance

CODE

public class Listener extends AbstractConsumerSeekAware implements ConsumerAwareMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
        // pause if certain offset received
    }

}

This is the container

public KafkaMessageListenerContainer<String, String> getContainer(String topic, int partition, long offset) {
        ContainerProperties containerProperties = new ContainerProperties(new TopicPartitionOffset(topic, partition, offset));
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        kafkaMessageListenerContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
        kafkaMessageListenerContainer.setAutoStartup(false);
        kafkaMessageListenerContainer.getContainerProperties().setMessageListener(new Listener());
        return kafkaMessageListenerContainer;
}

private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
Trayambak Kumar
  • 113
  • 1
  • 9

1 Answers1

0

Creating containers like that will disable many features (e.g. event publishing).

It would be better to use Boot's auto-configured ConcurrentKafkaListenerContainerFactory to create the containers.

/**
 * Create and configure a container without a listener; used to create containers that
 * are not used for KafkaListener annotations. Containers created using this method
 * are not added to the listener endpoint registry.
 * @param topicPartitions the topicPartitions to assign.
 * @return the container.
 * @since 2.3
 */
C createContainer(TopicPartitionOffset... topicPartitions);

When you no longer need the container, simply call container.stop().

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Hi Gary, How should I replace "KafkaMessageListenerContainer" by "ConcurrentKafkaListenerContainerFactory", can you give me a sample code? – Trayambak Kumar Apr 29 '21 at 17:38
  • See my answer to [this question](https://stackoverflow.com/questions/61950229). – Gary Russell Apr 29 '21 at 19:14
  • I have changed my code to whatever you mentioned in the link. It's working fine. I am not sure about @EventListener. How will I know it is working? – Trayambak Kumar Apr 30 '21 at 05:43
  • You don't need that if you have some other criteria for stopping the container; that question was about how to dynamically create a container and stop it when there are no more records to read. For it to work, you need to configure the `idleEventInterval` as shown in the answer. – Gary Russell Apr 30 '21 at 12:43