I was following this
Creating KafkaListener without Annotation & without Spring Boot
I have an API, so whenever the API is called a new container is created with a new message listener. The listener will fetch messages of certain offsets and then it will be paused. After that, I don't need it. But when the API is hit many times lots of objects will be created. I want to destroy the useless objects.
I was thinking of when a listener is idle for some time, then I can destroy it.
How can I do this?
Thanks in advance
CODE
public class Listener extends AbstractConsumerSeekAware implements ConsumerAwareMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
// pause if certain offset received
}
}
This is the container
public KafkaMessageListenerContainer<String, String> getContainer(String topic, int partition, long offset) {
ContainerProperties containerProperties = new ContainerProperties(new TopicPartitionOffset(topic, partition, offset));
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
kafkaMessageListenerContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
kafkaMessageListenerContainer.setAutoStartup(false);
kafkaMessageListenerContainer.getContainerProperties().setMessageListener(new Listener());
return kafkaMessageListenerContainer;
}
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}