0

Lately, I've been trying to get the Reply-To pattern to work in Apache NMS /ActiveMQ and have been having problems sending messages to temporary queues using only the name of the temporary queue.

The project is dispatcher service which retrieves requests from the bus and sends them to another process/runtime (based on complex routing criteria) to process the request. This separate processor then uses the reply-to queue name and correlation ID to craft the response and sending it to the original requester on the same broker but a different connection.

The problem is that it appears you can only send to a temporary queue (or topic) if you have the IDestination object reference from the message's NMSReplyTo header. If that reference is lost, there is no way to send messages to a temporary queue (or topic) by simply using its name.

Illustrating this problem is this simple "Pong" service which listens on a message queue and issues a response to the requester using the contents of the NMS Reply-To header. It mimics dispatching the request to another process by simply calling the ProcessMessage(string,string) method.

    using System;
    using Apache.NMS;

    namespace PongService
    {
        /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
        class PongService
        {
            static ISession session = null;
            static IMessageProducer producer = null;

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("Connecting to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                IConnection connection = factory.CreateConnection();
                session = connection.CreateSession();

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Using destination: " + destination);

                producer = session.CreateProducer(null);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                connection.Start();

                consumer.Listener += new MessageListener(OnMessage);

                Console.WriteLine("Press any key to terminate Pong service . . .");

                // loop until a key is pressed
                while (!Console.KeyAvailable)
                {
                    try { System.Threading.Thread.Sleep(50); }
                    catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
                } // loop

                Console.Write("Closing connection...");
                consumer.Close();
                producer.Close();
                session.Close();
                connection.Close();
                Console.WriteLine("done.");
            }


            /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
            /// <param name="receivedMsg">The message received on the request queue.</param>
            protected static void OnMessage(IMessage receivedMsg)
            {
                // mimic the operation of passing this request to an external processor which can connect 
                // to the broker but will not have references to the session objects including destinations
                Console.WriteLine("Sending request to an external processor");
                ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
            }


            /// <summary>Models a worker in another process/runtime.</summary>
            /// <param name="queuename">Where to send the results of processing</param>
            /// <param name="crid">Correlation identifier of the request.</param>
            protected static void ProcessMessage(string queuename, string crid)
            {
                ITextMessage response = session.CreateTextMessage("Pong!");
                response.NMSCorrelationID = crid;

                IDestination destination = session.GetQueue(queuename);

                Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
                try
                {
                    producer.Send(destination, response);
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine("Could not send response: " + ex.Message);
                }

            }

        }

    }

Now for the client. It simply creates a temporary queue, starts listening to it and then sends a request on the queue on which our "Pong" service is listening. The request message contains the IDestination of the temporary queue.

    using System;
    using System.Threading;
    using Apache.NMS;
    using Apache.NMS.Util;

    namespace PongClient
    {
        class PongClient
        {
            protected static AutoResetEvent semaphore = new AutoResetEvent(false);
            protected static ITextMessage message = null;
            protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("About to connect to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);

                IConnection connection = factory.CreateConnection();
                ISession session = connection.CreateSession();

                IDestination temporaryDestination = session.CreateTemporaryQueue();
                Console.WriteLine("Private destination: " + temporaryDestination);

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Service destination: " + destination);


                IMessageConsumer consumer = session.CreateConsumer(destination);
                consumer.Listener += new MessageListener(OnMessage);

                IMessageProducer producer = session.CreateProducer(destination);

                connection.Start();

                // Send a request message
                ITextMessage request = session.CreateTextMessage("Ping");
                request.NMSCorrelationID = Guid.NewGuid().ToString();
                request.NMSReplyTo = temporaryDestination;
                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
                if (message == null)
                {
                    Console.WriteLine("Timed-Out!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }



            protected static void OnMessage(IMessage receivedMsg)
            {
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }
        }
    }

The Pong process seems to operate correctly, only it winds-up making a completely new, separate queue from the one specified in the Reply-To header.

Here are the versions of the technologies involved:

  • Apache.NMS.ActiveMQ v1.5.1
  • Apache.NMS API v1.5.0
  • ActiveMQ 5.5.0
  • C# .NET 3.5

This question is related to this post which describes a similar problem. Hopefully these examples will help clarify the issue in that request as well.

Any help or insight to the solution would be greatly appreciated.

Community
  • 1
  • 1
SCote
  • 664
  • 1
  • 8
  • 19

3 Answers3

1

You're not actually setting the reply-to header in the request message from the PongClient.

Try this:

ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
Jakub Korab
  • 4,974
  • 2
  • 24
  • 34
  • Thanks Jake, My bad for not posting the current version of the code. I had already spotted that and tried it without success. In fact, that omission was spotted while posting question. Apparently the older version was cached. - Did it work for you? – SCote Sep 09 '11 at 10:49
0

I would recommend using a topic as a reply destination, and have your consumer filter based on the NMSCorrelationID. This is the implementation I have moved to after much frustration with temp queues. It actually has many advantages.

  1. It cuts down on intensive resource usage on the server (no need to construct/deconstruct temp queues).
  2. It allows you to use another consumer to monitor the response sent back (you will never be able to "peek" inside a temp queue).
  3. And it is much more reliable because the topic can be passed via a logical name instead of a specific token ID (which you are losing across connections).
Jim Gomes
  • 774
  • 8
  • 15
0

You need to use the IDestination you are passed.

Calling

IDestination destination = session.GetQueue(queuename); 

is a little evil. Under the covers it calls CreateTemporaryQueue() replaces the existing temporary queue with a new one of the same name without informing you.

James World
  • 29,019
  • 9
  • 86
  • 120
  • I'm using IBM.XMS 8.0.0.4 and there is no method called GetQueue in the ISession interface – Erez Sep 14 '16 at 15:49