-1

I found most of what I was looking for here ActiveMQ - Do I need to re-subscribe to a queue after the Listener event fires?, but I am unable to figure out how to keep the listener running other than using a while(true) loop, which I think there must be a better way to keep the listener active while at the same time having the ability to graciously dispose of all process if I need to stop the application. User Tim Bish affirmatively answers reckface's statement "Does this mean the Listener event will fire for each message without a while loop?", but for the life of me I can't figure out how to implement it without a while(true) loop.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System.Runtime.Serialization.Json;
using System.IO;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Web;



namespace ActiveMQConnectionTest
{
    class Program : IDisposable
    {
        private static IConnection connection;
        private static ISession session;
        private static SqlConnection sqlConn;
        private static ActiveMQMessage msg;
        private static MessageConsumer consumer;
        private static DateTime timeStamp;
        private static AutoResetEvent semaphore = new AutoResetEvent(false);
        private static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
        static string un = ConfigurationManager.AppSettings["AMQUserName"];
        static string pwd = ConfigurationManager.AppSettings["AMQPassword"];
        static string url = ConfigurationManager.AppSettings["url"];
        static string queue = ConfigurationManager.AppSettings["queue"];
        private static string oldMsgId;



        Program() 
        {
            AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
        sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"].ToString());
        System.Uri uri = new Uri(url);
        IConnectionFactory factory = new ConnectionFactory(uri);



        try
        {
            connection = factory.CreateConnection(un, pwd);
            connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
            session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
            ActiveMQDestination dest = session.GetDestination(queue) as ActiveMQDestination;
            consumer = session.CreateConsumer(dest) as MessageConsumer;
        }
        catch (NMSConnectionException ex)
        {
            Console.Write(ex.Message);
            connection.Dispose();
        }

        try
        {

            connection.Start();
            Console.WriteLine("Connection Started...");
            Console.WriteLine("Session Created....");

        }
        catch (ConnectionFailedException ex)
        {
            connection.Close();
            Console.Write(ex.Message);
        }

    }

    ~Program()
    {
        Dispose(false);
    }

    protected void Dispose(Boolean itIsSafeToAlsoFreeManagedObjects)
    {

        if (itIsSafeToAlsoFreeManagedObjects)
        {
            if (connection != null)
            {
                connection.Dispose();
            }
            if (session != null)
            {
                session.Dispose();
            }
            if (consumer != null)
            {
                consumer.Dispose();
            }
        }
    }

    public void Dispose()
    {
        Dispose(true); 
    }       

    static void ShutDown()
    {

        session.Close();
        if (connection.IsStarted)
        {
            connection.Stop();
            connection.Close();
            connection.Dispose();
        }
    }

    protected static void consumer_Listener(IMessage messasge)
    {
        messasge.Acknowledge();
        msg = (ActiveMQMessage)messasge;

       if (msg.MessageId.ToString() != oldMsgId)
       {
        oldMsgId = msg.MessageId.ToString();
        msg.Acknowledge();
        if (msg == null)
        {
            Console.WriteLine("No message received!");
        }
        else
        {
            Console.WriteLine("Received message with ID: " + msg.NMSMessageId);
            Console.WriteLine("Received message with conetent: " + msg.ToString());

            try
            {
                string s = ASCIIEncoding.ASCII.GetString(msg.Content);
                timeStamp = DateTime.Now;

                DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(I280Message));
                var ms = new MemoryStream(msg.Content);
                I280Message rows = (I280Message)deserializer.ReadObject(ms);
                int MessageId = InsertPerson(rows.Person);

                semaphore.Set();
            }
            catch (NMSException ex)
            {
                ShutDown();
                Console.WriteLine(ex.Message);
            }
        }
          }
        else {
            Console.WriteLine("Same old message....");
        }


    }



    private static int InsertPerson(Person person)
    {
        using (SqlConnection sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"]))
        {
            using (SqlCommand sqlCmd = new SqlCommand("I280MessagePerson_tbl_isp", sqlConn))
            {

                sqlCmd.CommandType = CommandType.StoredProcedure;
                sqlCmd.Parameters.AddWithValue("@BirthDate", person.BirthDate);
                sqlCmd.Parameters.AddWithValue("@Gender", person.Gender);
                sqlCmd.Parameters.AddWithValue("@VisaPermitType", person.VisaPermitType, null);
                sqlCmd.Parameters.AddWithValue("@CitizenshipStatus", person.CitizenshipStatus, null);
                sqlCmd.Parameters.AddWithValue("@ConfidentialFlag", person.ConfidentialFlag);
                sqlCmd.Parameters.AddWithValue("@DeceasedFlag", person.DeceasedFlag, null);
                sqlCmd.Parameters.AddWithValue("@TimeStamp", timeStamp);
                SqlParameter paramPersonId = new SqlParameter("@MessageId", SqlDbType.Int);
                paramPersonId.Direction = ParameterDirection.Output;
                sqlCmd.Parameters.Add(paramPersonId);




                sqlConn.Open();
                try
                {

                    sqlCmd.ExecuteNonQuery();
                    return (int)(sqlCmd.Parameters["@MessageId"].Value);
                }
                catch (SqlException ex)
                {
                    Console.WriteLine(ex.Message);
                    if (sqlConn.State == ConnectionState.Open) sqlConn.Close();
                    return -1;
                }

            }
        }


    }

    static void Main(string[] args)
    {           

            using (Program pr = new Program())
            {
                consumer.Listener += new MessageListener(consumer_Listener);
            }



            //while (true)
            //{

            //    consumer.Listener += new MessageListener(consumer_Listener);
            //    semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
            //}
        //catch (NMSException ex)
        //{
        //    ShutDown();
        //    Console.WriteLine(ex.Message);
        //}

       // Console.ReadLine();



    }


}

public static class SqlParameterCollectionExtensions
{
    public static SqlParameter AddWithValue(this SqlParameterCollection target, string parameterName, object value, object nullValue)
    {
        if (value == null || (string)value == "")
        {
            return target.AddWithValue(parameterName, nullValue ?? DBNull.Value);
        }
        return target.AddWithValue(parameterName, value);
    }
}

}

  • Are you asking how to keep the console application running so that it will maintain a reference to the Listener object? – Robert Harvey Oct 17 '18 at 20:50
  • Thanks for your reply Robert. No, I am asking how can I permanently keep the listener active waiting for messages to come in, retrieve and then then process them until I decide to stop the application which should happen in a way that would close and dispose of the session and connection objects. – user3623943 Oct 17 '18 at 21:36
  • Hold a reference variable that points to your listener object. Call `Dispose()` on it when you're done using it. The `using` statement in C# is just shorthand for a `try`/`catch` block with a `Dispose()` call in the `finally` block: see https://stackoverflow.com/a/212210 – Robert Harvey Oct 17 '18 at 21:41
  • Unfortunately that is only going to retrieve one message and then end the application. Even if I add a while loop with that goes through all the messages in the queue, it will eventually consume all the messages and the application will end. I want to keep the listener alive waiting for new messages to arrive at anytime in the future.At the same time, I want the ability to release all resources before the process is shutdown for some reason. I have added the full code to my original question. – user3623943 Oct 18 '18 at 19:32
  • 1
    That's a lot of code. Are you sure this is the *smallest* possible code sample that reproduces the problem? – Robert Harvey Oct 18 '18 at 21:15
  • I have edited the code to only keep the minimum to illustrate the problem. I kept some code commented under the main method to show the while(true) loop I had to resort to in order to keep the listener alive. Thanks a lot of your help and patience. – user3623943 Oct 19 '18 at 01:20
  • Does the listener delegate really expire when you receive a message, or is that just because you wrapped it in a `using` statement? Did you actually try my suggestion, or did you just conclude without trying it that it wasn't going to work? – Robert Harvey Oct 19 '18 at 01:40
  • There were initially 62 records in the queue waiting to be retrieved. After I made changes per your your suggestion, I ran it and it retrieved one record and it shutdown. I ran it a second time and it retrieved a second record and shutdown again. – user3623943 Oct 19 '18 at 03:04
  • The `using` statement will cause your Program object to be disposed as soon as control flow leaves the `using` block. It's a miracle that it's doing anything useful at all. – Robert Harvey Oct 19 '18 at 03:06

2 Answers2

0

So basically your problem is that you're allowing your program to exit. When that happens, any memory that is allocated during the time your program ran is reclaimed by the operating system, including your Program object and your consumer listener delegate.

Your friend at the previous question you asked pointed out that, if you store the ActiveMQ listener as a member variable and keep it in scope, you ought to be able to receive as many messages as you want, for as long as you want, without adding a new listener each time a message is received.

So now all that's left to do is keep your program from exiting. There are a number of ways to do that:

  1. You can change your program to a Winforms application, as described in other Stack Overflow posts. This will cause a Message Loop to be created.

  2. You can read a character from the console. This is a blocking call; the program will wait until the user presses a key. In the meantime, your ActiveMQ should still be able to receive events.

  3. You can use while (iStillWantToReceiveMessages) { }

Robert Harvey
  • 178,213
  • 47
  • 333
  • 501
0

This is not an answer to the question. But I am writing this so that others who have faced issues similar to mine can refer the answer when the face an issue.

But I was facing an issue where the connection to ActiveMQ (using C#) was active, no exceptions, no errors but still the client was not receiving any messages published by the server.

After studying a while in Apache Site, I could figure out that this was happening because of timeout. I got it fixed by using the below line of code :-

brokerUri += "?transport.useInactivityMonitor=false&transport.useKeepAlive=true";

where brokerUri is my activeMq uri.

undefined
  • 142
  • 10