0

Our Spring 5 application is configured to use ActiveMQ and to persist the messages. We decided to go with KahaDB file storage.

To test message persistence, I commented out the MessageListener bean and sent a message to a queue (via JmsTemplate) and verified the message gets written to kahadb data log files. After un-commenting the MessageListener bean and restarting the broker, the message does not get delivered to the listener. I am unable to figure out why the message is not delivered upon broker restart, any help will be greatly appreciated.

Below is the code that adds KahaDb persistence to ActiveMQ configuration:

   @Bean(initMethod = "start", destroyMethod = "stop")
   public BrokerService brokerServiceConfig() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.addConnector("vm://localhost");
        brokerService.setBrokerName("order-broker");
        PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
        File kahaDir = new File("/home/test");
        kahaDbAdapter.setDirectory(kahaDir);
        brokerService.setPersistenceAdapter(kahaDbAdapter);
        brokerService.setPersistent(true);
   }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(DeliveryMode.PERSISTENT);
        return template;
    }
    
    @Bean
    @DependsOn({"brokerService"})
    private static ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL("vm://my-broker?create=false");
        
        List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
        trustedPackageList.add("com.mypackages");
        activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
        CachingConnectionFactory connFactory = new CachingConnectionFactory();
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setUseAsyncSend(true);
        connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
        return connFactory;
    }

    @Bean
    public DefaultMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory()); 
        dmlc.setSessionTransacted(false);
        dmlc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);   
        dmlc.setDestinationName(TEST_QUEUE);
        dmlc.setMaxConcurrentConsumers(25); 
        dmlc.setMessageListener(msgListener()); 
        return container;
    }       
    
    @Bean
    public TestMsgListener msgListener() {
        return new TestMsgListener(); // This is a Message Driven POJO.
    }

    // MessageProducer code -
    @Autowired
    private JmsTemplate jTemplate;
    
    @Autowired
    private ActiveMQQueue testQueue;

    public void sendMessage() {
        try {
            MySerializedObject obj = <code to create new object>;
            jTemplate.convertAndSend(this.testQueue, obj);
        }catch(Throwable e) {
        }
    }

    // MessageListener code -
    public class TestMsgListener implements MessageListener {

    @Autowired
    public MessageConverter converter;

    public final void onMessage(Message message) {
        try {
             MySerializedObject obj = 
             (MySerializedObject)converter.fromMessage(message));

        } catch (Throwable e) {
          // log error.
        }
    }
 
  • Thanks for the help, have put the entire code above. Please let me know if you want me to provide more details. Per Spring docs, it seems like DMLC should not be use CachedConnectionFactory (did not understand why though) – NJavalearner Dec 28 '20 at 06:16
  • Our application was upgraded to JMS 2.x and ActiveMq 5.x supports JMS 1.1, Could this be the reason why message is not consumed from Kahadb? – NJavalearner Dec 28 '20 at 23:46
  • How are you verifying that your sent message is persisted ? – Tim Bish Dec 29 '20 at 04:48
  • By checking data logs in kahadb data directory, I understand this corrupts the db log files but I just deleted the entire directory after seeing the message in db1-.log file, this is fine as I'm working on dev server. – NJavalearner Dec 30 '20 at 23:15
  • I found the issue, since I commented out listener from DMLC to test message persistence/read from Kahadb, it looks like messages were either getting expired or acknowleged so never got delivered after server restart. After un-commenting the listener from DMLC bean definition and setting a Thread.sleep to 15 secs on listener, I stopped the server immediately after sending message to the queue. After re-starting the server/tomcat I saw the message got delivered from KahaDb to the consumer. So when I commented the listener, did the message expire or was it acknowledged (hard to read data files) – NJavalearner Dec 30 '20 at 23:26

0 Answers0