0

I have a ActiveMQ consumer script running in Java where I am calling consumer.receive() in a while(true) loop.

I need to implement timeout for each message processed (eg: if a message process goes beyond 15 seconds I have to receive the next one).

I have given the client acknowledge mode for ACK.

Please look at the consumeMessage method where I have implemented the consume.

Desired outcome:

After 15 seconds the first message needs to be discarded (i.e. it should not invoke acknowledge()). The next message needs to be processed instead.

//package consumer;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class ActivmqConsumer implements ExceptionListener {

    ActiveMQConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;

    public ActivmqConsumer() throws Exception{
        String USERNAME = "admin";      
        String PASSWORD = "admin";
        this.connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "tcp://192.168.56.101:61616?jms.prefetchPolicy.all=1");
        // Create a Connection
        this.connection = connectionFactory.createConnection();
        connection.start();
        connection.setExceptionListener(this);
        // Create a Session
        this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    }

    public void consumeMessage(String destinationName, EventProcesser eventprocess){
        Destination destination = null;
        MessageConsumer consumer = null;
        try{
        // Create the destination (Topic or Queue)
        destination = session.createQueue(destinationName);

        // Create a MessageConsumer from the Session to the Topic or Queue
        consumer = session.createConsumer(destination);

        // Wait for a message
        while(true){
            Message message = consumer.receive(2);
            if(message==null){
                continue;
            }
            else if(message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text);
                eventprocess.processEvent(text);
                message.acknowledge();
            } else{
                System.out.println("Received: " + message);
            }
         }
        }catch(Exception ex){
            ex.printStackTrace();
        }finally{
            try{
            consumer.close();
            }catch(Exception ex){
                ex.printStackTrace();
            }
        }
    }
}
Justin Bertram
  • 29,372
  • 4
  • 21
  • 43
Kannan K
  • 77
  • 2
  • 12
  • Did my answer address your question? If so, please mark it as correct to help other users who have this same question in the future. If not, please elaborate as to why. Thanks! – Justin Bertram Jun 21 '19 at 15:38

1 Answers1

0

There is no "max message processing time" or equivalent feature in ActiveMQ. You'll need to monitor the processing yourself. Maybe take a look at this question/answer for ideas on how to do that. An alternative would be to use a JTA transaction manager and consume the message in a transaction with a timeout of 15 seconds. Using an MDB in a Java EE container would be a simple way to get the transaction timeout functionality.

Justin Bertram
  • 29,372
  • 4
  • 21
  • 43