6

I am trying to implement JMS in my project. I am using active mq as the provider, and using persistent queues. Following is the code to retrieve elements from active mq

            conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
            conn.start();
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(queue);
            ObjectMessage obj = (ObjectMessage) consumer.receiveNoWait();

This code returns data sometimes and sometimes it returns null, even when I can see number of pending messages as non -zero in active mq admin console. I read a bunch of articles, and few ppl mentioned that JMS api does not mandates that you will get the element everytime, and you will have to code accordingly. Since in my scenario, i am depending on the queue, as soon as queue returns null, i kill the process, so i modified the code in the following way

Instead of calling receiveNoWait, I started using receive after checking whether element is present in queue, via queue browser. Following is the modified code

public static <T> T retrieveObjectFromQueue(Queue queue, Class<T> clazz) {
synchronized (queue) {
    if(!queueHasMoreElements(queue))
        return null;
    Connection conn = null;
    Session session = null;
    try {
        conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
        conn.start();
        session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(queue);
        ObjectMessage obj = (ObjectMessage) consumer.receive();
        return clazz.cast(obj.getObject());
    } catch(Exception e) {
        throw new RuntimeException(e);
    }finally {
        closeSessionAndConnection(session, conn);
    }
}


public static boolean queueHasMoreElements(Queue queue) {
Connection conn = null;
Session session = null;
try {
    conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
    conn.start();
    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    QueueBrowser browser = session.createBrowser(queue);
    Enumeration enumeration = browser.getEnumeration();
    return enumeration.hasMoreElements();
} catch(Exception e) {
    throw new RuntimeException(e);
}finally {
    closeSessionAndConnection(session, conn);
}

Now my code gets stuck after processing around 20-30 elements, and again, I can see pending elements in the admin console. When i tried using debug mode, i realised, that after retrieving 20-30 elements, my code gets stuck at consumer.receive(), which is expected in case queue is empty, but when i check my admin console, it shows a lot of elements in the queue.

I am using jdbc(mysql) as persistent store for activemq. The configuration xml that I am using is as given in activemq configuration examples (activemq/examples/conf/activemq-jdbc-performance.xml)

I am using tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 as activemq url.

Please let me know what i am doing wrong. I am using java8 and apache activeMq 5.13.1

Thanks

vishva
  • 366
  • 5
  • 17

2 Answers2

1

The JMS specification also does not mandate that a QueueBrowser will return all messages in a Queue, you might only get a snapshot of what is there when it starts depending on a lot of factors.

You are trying to impose semantics on JMS that it does not guarantee. You can try setting the prefetch to zero which will cause the client to poll the broker for a message and wait until the broker tells there is or isn't one. You might still get nothing if the message hasn't yet hit the Queue when you poll though, that's just something you need to handle.

You can also use the timed receive method and impose a timeout that you are willing to wait before returning and terminating your application.

Tim Bish
  • 17,475
  • 4
  • 32
  • 42
  • 1
    Hi Tim, I edited my question to give u a better view. I am getting stuck at calling receive() function, receive I believe is guaranteed to return an element, if there is one. In my case, i am using queue browser only for checking queue being non-empty, and then calling receive. Somehow receive does not returns even when queue is not empty – vishva Feb 17 '16 at 13:58
  • There are many reason that the browser might not return a message, one being that the browser takes a snapshot so new messages won't show up. There is also a maxBrowsePageSize that affects what it might return. – Tim Bish Feb 18 '16 at 15:42
  • I will remove the queue browser part, but as i mentioned before, my code is getting stuck at the **receive** function of **MessageConsumer**. I believe, even if i remove queue browser, problem won't be solved. Since receive is not returning elements, even when the queue has more elements – vishva Feb 19 '16 at 01:24
0

We saw a similar issue (where receiveNoWait returned null even when there were messages on the broker). I think this is due to the pre-fetch buffer in activemq [1][2], and this can be fixed by setting the prefetch limit to 0.

[1] https://activemq.apache.org/what-is-the-prefetch-limit-for
[2] Does JMS receiveNoWait() guarantee message delivery when messages are available?

robd
  • 9,646
  • 5
  • 40
  • 59