4

I have created session in hornetQ with one consumer, then I have added 4 messages in queue using producer. After this I have created new consumer.

Will this consumer knows about older messages?

If no, is it possible to configure it in XML?

I have create new consumer which was not able to get previous messages. I just wanted to confirm whether this behavior is correct or not? I didn't find any help in the documentation.

Below is the code snippet :

TextMessage receivedMessage = (TextMessage)consumer.receive(); 
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();  
System.out.println("Got order: " + receivedMessage.getText());


If I uncomment the consumer.close() line it works fine.
My hornetq-jms.xml

<connection-factory name="NettyConnectionFactory">
      <xa>true</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/XAConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/ConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyThroughputConnectionFactory">
      <xa>true</xa>
      <connectors>
         <connector-ref connector-name="netty-throughput"/>
      </connectors>
      <entries>
         <entry name="/XAThroughputConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyThroughputConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty-throughput"/>
      </connectors>
      <entries>
         <entry name="/ThroughputConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

Code snippet of connection factory

TransportConfiguration transportConfiguration = new
                TransportConfiguration(NettyConnectorFactory.class.getName());
        HornetQConnectionFactory cf =
                HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
        Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");

Code for getTransportConfiguration() :

private synchronized static TransportConfiguration getTransportConfiguration() {
        HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
        TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
        if(tc == null){
            Map<String, Object> connectionParams = new HashMap<String, Object>();
            connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
            connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
            tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
            transportConfigurationMap.put("machinename:5455", tc);
        }
        return tc;
Sachin
  • 3,424
  • 3
  • 21
  • 42

2 Answers2

7

Yes it will know about your old messages. But on this case you had your old consumer still open, so that consumer will be caching messages on its buffer unless you close it, or you change consumer-window-size = 0.

Most Message Systems will cache ahead on the consumer so next time you call receive on the consumer the message will be ready to be received.

However, if your consumer is slow and you don't have that many messages the message will be on the client's buffer until you close that consumer.

For fast consumers in production the best is always to cache ahead as that will improve your throughput that would be limited by your network latency without caching.

On the HornetQ case you could cope with slow consumers by setting consumer-window-size=0.

http://docs.jboss.org/hornetq/2.3.0.beta3/docs/user-manual/html/flow-control.html#flow-control.consumer.window

In the case you are instantiating the connection factory through JNDI lookup:

   <connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty-connector"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>
      </entries>

      <!-- We set the consumer window size to 0, which means messages are not buffered at all
      on the client side -->
      <consumer-window-size>0</consumer-window-size>

   </connection-factory>

Or on the case of you directly instantiating your connection factory, you must set consumerWindowSize at the instance:

TransportConfiguration transportConfiguration = new
                TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
cf.setConsumerWindowSize(0) // <<<<<< here

This is a running example from the HornetQ distribution at examples/jms/no-consumer-buffering. It's exactly the same as your code snippet and it works every time:

 // Step 5. Create a JMS Session
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 // Step 6. Create a JMS Message Producer
 MessageProducer producer = session.createProducer(queue);

 // Step 7. Create a JMS MessageConsumer

 MessageConsumer consumer1 = session.createConsumer(queue);

 // Step 8. Start the connection

 connection.start();

 // Step 9. Send 10 messages to the queue

 final int numMessages = 10;

 for (int i = 0; i < numMessages; i++)
 {
    TextMessage message = session.createTextMessage("This is text message: " + i);

    producer.send(message);
 }


 System.out.println("Sent messages");

 // Step 10. Create another consumer on the same queue

 MessageConsumer consumer2 = session.createConsumer(queue);

 // Step 11. Consume three messages from consumer2

 for (int i = 0; i < 3; i++)
 {
    TextMessage message = (TextMessage)consumer2.receive(2000);

    System.out.println("Consumed message from consumer2: " + message.getText());
 }

And as you can see on this example old messages are being received.

Anything different than that is a misconfiguration from your system. Perhaps you didn't set the right connection factory?

BTW: On ActiveMQ you can manage the prefetch limit to manage the same behaviour:

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

This question is an exact duplication of JMS queue with multiple consumers

As for retroactive messages that's another feature on ActiveMQ that only applies to Topics, a subscription being created with old messages on it.

Community
  • 1
  • 1
Clebert Suconic
  • 5,353
  • 2
  • 22
  • 35
3

The functionality you're looking for is provided through Durable subscriptions. This is part of standard JMS spec. I'm sure you will find it if you look through HornetQ documentation for your version. Also, here is a good example of a Java client using JMS Durable subscription in HornetQ.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
  • 1
    Thanks for your answer. But, if I didn't close my old consumer the new consumer do not receive the old message :( – Sachin Mar 04 '13 at 13:59
  • Look to my answer... the messages were being cached at the consumer's buffer. I also provided you a link to the doc. – Clebert Suconic Mar 04 '13 at 15:17
  • After looking through documentation, I don't think HornetQ implements retroactive consumers. Unlike ActiveMQ or IBM MQSeries... – mazaneicha Mar 04 '13 at 16:50
  • mazaneicha: that's wrong... the user is just letting caching on the consumer buffer. ActiveMQ has cache-ahead, and I believe MQSeries too. There's nothing about retroactive consumers here.. that's a queue. – Clebert Suconic Mar 04 '13 at 21:09
  • Not sure I understand your comment.. The question was: "...I have added 4 messages in queue using producer. After this I have created new consumer. Will this consumer knows about older messages?" This is called retroactive consumption. I can not see a way to achieve it using HornetQ. Please post a code that does it. – mazaneicha Mar 04 '13 at 23:07
  • Did you actually read my message? The message still available, you still have a consumer open and the system thinks that you will still be able to receive messages... Simple set ConsumerWindowSize=0, dealing with slow consumers and you will be fine! On ActiveMQ this is called pre-fetch: http://activemq.apache.org/what-is-the-prefetch-limit-for.html On HornetQ this is called consumer-window Size... There are no differences – Clebert Suconic Mar 05 '13 at 04:03
  • Notice that only one consumer can receive your message. The message can exist in only one client buffer. Disable client buffering if you don't need that feature as I explained. Hope that helps – Clebert Suconic Mar 05 '13 at 04:08
  • @Clebert : I have already set ConsumerWindowSize to 0. Still new consumer is not picking the old messages. – Sachin Mar 05 '13 at 07:24
  • @Clebert Thanks for the clarification. I might have misunderstood the original question, but it seemed to indicate that sachin pasalkar wanted NEW consumer to receive an OLD message that is NO LONGER available. Thats why I was referring to [this ApacheMQ feature](http://activemq.apache.org/retroactive-consumer.html) – mazaneicha Mar 05 '13 at 13:26
  • The code snippet is not receiving messages because of the cache ahead as I said. The fix for the code is setting consumer-window-size=0 – Clebert Suconic Mar 05 '13 at 14:04
  • lease, read my answer in detail.. you seem to be ignoring what I said... this is about consumer-window-size.. nothing else. OLD Messages are available on that case.. just close the consumer and let the client's buffer to be released. a message can only be delivered once until you ack or unack it (by a close or rollback on TX). – Clebert Suconic Mar 05 '13 at 19:56
  • @Clebert: As I told earlier in comment that I have already set consumer-window-size=0 in xml file. So that the messages will not be buffered by consumer. Still I am facing the above problem if I do not close the consumer :-( – Sachin Mar 06 '13 at 07:43
  • @mazaneicha: The message is not buffered by old consumer as I have already set consumer-window-size=0. So, the message messages sent before creation of new consumer is still present in queue. – Sachin Mar 06 '13 at 07:44
  • You probably didn't do at the right connection factory. The example you provided is a clear example of client buffering. If that was the case you should talk on the HOrnetQ user's forum, provide a running example with your issue and we would raise a bug. – Clebert Suconic Mar 06 '13 at 13:11
  • I'm the project Lead for hornetq.. I know what I'm saying... you didn't properly configure your system with consumer-window-size. I would need more information to understand what you did wrong, but for the sake of this question, my answer is beyond the necessary for you understanding it. – Clebert Suconic Mar 06 '13 at 13:21
  • I have edited my answer... I can't make it more complete than that – Clebert Suconic Mar 06 '13 at 13:28
  • @Clebert: I am newbie in HornetQ, so I just want to know why it is not working in my case. Could you please check my updated question which contain hornetq-jms.xml. Thanks for keep replying :) – Sachin Mar 06 '13 at 14:41
  • that's correct.. but I'm not sure what connection factory are you using? Are you using that inside MDBs? Add the lookup to your code snippet. – Clebert Suconic Mar 06 '13 at 14:43
  • @Clebert: I have updated my question. Please correct me if I am wrong. – Sachin Mar 06 '13 at 14:51
  • since you are directly instantiating the connection factory you must call cf.setConsumerWindowSize(0) The XMLs are only affecting the jndi lookups – Clebert Suconic Mar 06 '13 at 15:43
  • I edited my answer again with the directly instantiation (your example) – Clebert Suconic Mar 06 '13 at 15:44