DefaultMessageListenerContainer.shutdown or DefaultMessageListenerContainer.destroy is not removing the consumer from the queue.
Here is a similar post : SpringJMS - How to Disconnect a MessageListenerContainer
(I am not sure how to solve it)
Below is my code :
public class MainProgram {
private static final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MessageConsumerFacade.class);
public static final DefaultMessageListenerContainer container = context.getBean(DefaultMessageListenerContainer.class);
public static void main(String[] args) throws InterruptedException {
boolean startListener = isStartListener(); // to start and stop listener at
will
if(startListener){
if (!container.isRunning()) {
container.start();
}
}else{
if (container.isRunning()) {
container.stop();
}
}
}
}
public class MessageConsumerFacade {
private ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(url);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(30000);
policy.setRedeliveryDelay(30000);
policy.setMaximumRedeliveries(2);
connectionFactory.setNonBlockingRedelivery(true);
return connectionFactory;
}
@Bean
public MessageListenerContainer listenerContainer() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName(queueName);
container.setMessageListener(new MessageJmsListener());
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
container.setErrorHandler(new MessageErrorHandler());
container.setSessionTransacted(true);
container.setAutoStartup(false);
container.shutdown();
return container;
}
}
public class MessageJmsListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
//process the message and create record in Data Base
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
public class MessageErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
//log error
}
}```