24

If we use JMS request/reply mechanism using "Temporary Queue", will that code be scalable?

As of now, we don't know if we will supporting 100 requests per second, or 1000s of requests per second.

The code below is what I am thinking of implementing. It makes use of JMS in a 'Synchronous' fashion. The key parts are where the 'Consumer' gets created to point a 'Temporary Queue' that was created for this session. I just can't figure out whether using such Temporary Queues is a scalable design.

  destination = session.createQueue("queue:///Q1");
  producer = session.createProducer(destination);
  tempDestination = session.createTemporaryQueue();
  consumer = session.createConsumer(tempDestination);

  long uniqueNumber = System.currentTimeMillis() % 1000;
  TextMessage message = session
      .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber);

  // Set the JMSReplyTo
  message.setJMSReplyTo(tempDestination);

  // Start the connection
  connection.start();

  // And, send the request
  producer.send(message);
  System.out.println("Sent message:\n" + message);

  // Now, receive the reply
  Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds
  System.out.println("\nReceived message:\n" + receivedMessage);

Update:

I came across another pattern, see this blog The idea is to use 'regular' Queues for both Send and Receive. However for 'Synchronous' calls, in order to get the desired Response (i.e. matching the request), you create a Consumer that listens to the Receive queue using a 'Selector'.

Steps:

    // 1. Create Send and Receive Queue.
    // 2. Create a msg with a specific ID
 final String correlationId = UUID.randomUUID().toString();
 final TextMessage textMessage = session.createTextMessage( msg );
 textMessage.setJMSCorrelationID( correlationId );

    // 3. Start a consumer that receives using a 'Selector'.
           consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );

So the difference in this pattern is that we don't create a new temp Queue for each new request. Instead all responses come to only one queue, but use a 'selector' to make sure each request-thread receives the only the response that is cares about.

I think the downside here is that you have to use a 'selector'. I don't know yet if that is less preferred or more preferred than earlier mentioned pattern. Thoughts?

rk2010
  • 3,481
  • 7
  • 27
  • 39
  • This mechanism was proposed in an earlier Question I had asked: http://stackoverflow.com/q/10778485/233306 I created this new Question just to know if this is scalable design – rk2010 May 28 '12 at 14:58
  • Hello, could you please share which method you eventually decided to use? Does any of the 2 methods tempq/selector has **signficant** performance/scallability advantages over the other? Thanks. – Borka Dec 14 '13 at 11:29

7 Answers7

6

Regarding the update in your post - selectors are very efficient if performed on the message headers, like you are doing with the Correlation ID. Spring Integration also internally does this for implementing a JMS Outbound gateway.

Biju Kunjummen
  • 49,138
  • 14
  • 112
  • 125
  • JMSOutboundGateway is a great resources. Thanks for the links. – rk2010 May 29 '12 at 11:45
  • Just to be clear this code will be ending up using a Message Header, right? `jmsRequest.setJMSCorrelationID(correlationId); messageSelector = "JMSCorrelationID = '" + correlationId + "'";` – rk2010 May 29 '12 at 11:48
  • Any thoughts on when using this "Temporary Queue" pattern is a good idea? I wonder why Websphere MQ's sample code had provided that pattern. – rk2010 May 29 '12 at 11:52
  • Actually I have seen a combination being used @rk2010 - basically if your calling **clients are few**, then you can create **one temp response queue per customer, and then select by correlation id**. This partitions the response queues and will be faster than one response queue for all clients. However if there is a lot of clients and you cannot distinguish one from the other, one response queue is better. The approach of 1 temp queue per request though is not advisable. – Biju Kunjummen May 30 '12 at 16:47
4

Interestingly, the scalability of this may actually be the opposite of what the other responses have described.

WebSphere MQ saves and reuses dynamic queue objects where possible. So, although use of a dynamic queue is not free, it does scale well because as queues are freed up, all that WMQ needs to do is pass the handle to the next thread that requests a new queue instance. In a busy QMgr, the number of dynamic queues will remain relatively static while the handles get passed from thread to thread. Strictly speaking it isn't quite as fast as reusing a single queue, but it isn't bad.

On the other hand, even though indexing on CORRELID is fast, performance is inverse to the number of messages in the index. It also makes a difference if the queue depth begins to build. When the app goes a GET with WAIT on an empty queue there is no delay. But on a deep queue, the QMgr has to search the index of existing messages to determine that the reply message isn't among them. In your example, that's the difference between searching an empty index versus a large index 1,000s of times per second.

The result is that 1000 dynamic queues with one message each may actually be faster than a single queue with 1000 threads getting by CORRELID, depending on the characteristics of the app and of the load. I would recommend testing this at scale before committing to a particular design.

T.Rob
  • 31,522
  • 9
  • 59
  • 103
2

Using selector on correlation ID on a shared queue will scale very well with multiple consumers.

1000 requests / s will however be a lot. You may want to divide the load a bit between different instances if the performance turns out to be a problem.

You might want to elaborate on the requests vs clients numbers. If the number of clients are < 10 and will stay rather static, and the request numbers are very high, the most resilient and fast solution might be to have static reply queues for each client.

Petter Nordlander
  • 22,053
  • 5
  • 50
  • 84
  • Thanks. Yeah, we probably will end up having multiple instances of the app running. So each node might not be doing 1000s per second. – rk2010 May 29 '12 at 11:46
1

Creating temporary queues isn't free. After all it is allocating resources on the broker(s). Having said that, if you have a unknown (before hand) potentially unbound number of clients (multiple JVMs, multiple concurrent threads per JVM, etc) you may not have a choice. Per-allocating client queues and assigning them to clients would get out of hand fast.

Certainly what you've sketched is the simplest possible solution. And if you can get real numbers for transaction volume and it scales enough, fine.

Before I'd look at avoiding temporary queues, I'd look more at limiting the number of clients and making the clients long lived. That is to say create a client pool on the client side, and have the clients in the pool create the temporary queue, session, connection, etc. on startup, reuse them on subsequent requests, and tear them down on shutdown. Then the tuning problem become one of max/min size on the pool, what the idle time is to prune the pool, and what the behavior is (fail vs block) when the pool is maxed. Unless you're creating an arbitrarily large number of transient JVMs (in which case you've got bigger scaling issues just from JVM startup overhead), that ought to scale as well as anything. After all, at that point the resources you are allocating reflect the actual usage of the system. There really is no opportunity to use less than that.

The thing to avoid is creating and destroying a large gratuitous number of of queues, sessions, connections, etc. Design the server side to allow streaming from the get go. Then pool if/when you need to. Like as not, for anything non-trivial, you will need to.

asudell
  • 101
  • 2
  • I think the temporary Queue has to be 'tied' to the session is creates it : `session.createTemporaryQueue();` And since Sessions can't be pooled, I don't know if pooling of Sessions can be used. – rk2010 May 28 '12 at 16:20
  • Please do check my Update in my original post above. I have added a new pattern I found. – rk2010 May 28 '12 at 16:21
0

Using temporary queue will cost creating relyToProducers each every time. Instead of using a cached producers for a static replyToQueue, the method createProducer will be more costly and impact performance in a highly concurrent invocation environment.

davy_wei
  • 139
  • 13
0

Ive been facing the same problem and decided to pool connections myself inside a stateless bean. One client connection has one tempQueue and lays inside JMSMessageExchanger object (which contains connectionFactory,Queue and tempQueue), which is bind to one bean instance. Ive tested it in JSE/EE environments. But im not really sure about Glassfish JMS pool behaviour. Will it actually close JMS connections, obtained "by hand" after bean method ends?Am I doing something terribly wrong?

Also Ive turned off transaction in client bean (TransactionAttributeType.NOT_SUPPORTED) to send request messages immediately to the request queue.

package net.sf.selibs.utils.amq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import lombok.Getter;
import lombok.Setter;
import net.sf.selibs.utils.misc.UHelper;

public class JMSMessageExchanger {

    @Setter
    @Getter
    protected long timeout = 60 * 1000;

    public JMSMessageExchanger(ConnectionFactory cf) {
        this.cf = cf;
    }

    public JMSMessageExchanger(ConnectionFactory cf, Queue queue) {
        this.cf = cf;
        this.queue = queue;
    }
    //work
    protected ConnectionFactory cf;
    protected Queue queue;
    protected TemporaryQueue tempQueue;
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected MessageConsumer consumer;
    //status
    protected boolean started = false;
    protected int mid = 0;

    public Message makeRequest(RequestProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
                this.tempQueue = this.session.createTemporaryQueue();
                this.consumer = this.session.createConsumer(tempQueue);
            }
            //send request
            Message requestM = producer.produce(this.session);
            mid++;
            requestM.setJMSCorrelationID(String.valueOf(mid));
            requestM.setJMSReplyTo(this.tempQueue);
            this.producer.send(this.queue, requestM);
            //get response
            while (true) {
                Message responseM = this.consumer.receive(this.timeout);
                if (responseM == null) {
                    return null;
                }
                int midResp = Integer.parseInt(responseM.getJMSCorrelationID());
                if (mid == midResp) {
                    return responseM;
                } else {
                    //just get other message
                }
            }

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    public void makeResponse(ResponseProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
            }
            Message response = producer.produce(this.session);
            response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID());
            this.producer.send(producer.getRequest().getJMSReplyTo(), response);

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    protected void init() throws Exception {
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.producer = this.session.createProducer(null);
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        this.connection.start();
        this.started = true;
    }

    public void close() {
        UHelper.close(producer);
        UHelper.close(consumer);
        UHelper.close(session);
        UHelper.close(connection);
        this.started = false;
    }

}

The same class is used in client (stateless bean) and server (@MessageDriven). RequestProducer and ResponseProducer are interfaces:

package net.sf.selibs.utils.amq;

import javax.jms.Message;
import javax.jms.Session;

public interface RequestProducer {
    Message produce(Session session) throws Exception;
}
package net.sf.selibs.utils.amq;

import javax.jms.Message;

public interface  ResponseProducer extends RequestProducer{
    void setRequest(Message request);
    Message getRequest();
}

Also I`ve read AMQ article about request-response implementation over AMQ: http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

arbocdi
  • 11
  • 2
-1

Maybe I'm too late but I spent some hours this week to get sync Request/Reply working within JMS. What about extending the QueueRequester with timeout. I did and at least testing on one single machine (running broker, requestor and replyer) showed that this solution outperforms the discussed ones. On the other side it depends on using a QueueConnection and that means you may be forced to open multiple Connections.