I am trying to implement JMS in my project. I am using active mq as the provider, and using persistent queues. Following is the code to retrieve elements from active mq
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ObjectMessage obj = (ObjectMessage) consumer.receiveNoWait();
This code returns data sometimes and sometimes it returns null, even when I can see number of pending messages as non -zero in active mq admin console. I read a bunch of articles, and few ppl mentioned that JMS api does not mandates that you will get the element everytime, and you will have to code accordingly. Since in my scenario, i am depending on the queue, as soon as queue returns null, i kill the process, so i modified the code in the following way
Instead of calling receiveNoWait, I started using receive after checking whether element is present in queue, via queue browser. Following is the modified code
public static <T> T retrieveObjectFromQueue(Queue queue, Class<T> clazz) {
synchronized (queue) {
if(!queueHasMoreElements(queue))
return null;
Connection conn = null;
Session session = null;
try {
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ObjectMessage obj = (ObjectMessage) consumer.receive();
return clazz.cast(obj.getObject());
} catch(Exception e) {
throw new RuntimeException(e);
}finally {
closeSessionAndConnection(session, conn);
}
}
public static boolean queueHasMoreElements(Queue queue) {
Connection conn = null;
Session session = null;
try {
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
return enumeration.hasMoreElements();
} catch(Exception e) {
throw new RuntimeException(e);
}finally {
closeSessionAndConnection(session, conn);
}
Now my code gets stuck after processing around 20-30 elements, and again, I can see pending elements in the admin console. When i tried using debug mode, i realised, that after retrieving 20-30 elements, my code gets stuck at consumer.receive(), which is expected in case queue is empty, but when i check my admin console, it shows a lot of elements in the queue.
I am using jdbc(mysql) as persistent store for activemq. The configuration xml that I am using is as given in activemq configuration examples (activemq/examples/conf/activemq-jdbc-performance.xml)
I am using tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 as activemq url.
Please let me know what i am doing wrong. I am using java8 and apache activeMq 5.13.1
Thanks