I found most of what I was looking for here ActiveMQ - Do I need to re-subscribe to a queue after the Listener event fires?, but I am unable to figure out how to keep the listener running other than using a while(true) loop, which I think there must be a better way to keep the listener active while at the same time having the ability to graciously dispose of all process if I need to stop the application. User Tim Bish affirmatively answers reckface's statement "Does this mean the Listener event will fire for each message without a while loop?", but for the life of me I can't figure out how to implement it without a while(true) loop.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System.Runtime.Serialization.Json;
using System.IO;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Web;
namespace ActiveMQConnectionTest
{
class Program : IDisposable
{
private static IConnection connection;
private static ISession session;
private static SqlConnection sqlConn;
private static ActiveMQMessage msg;
private static MessageConsumer consumer;
private static DateTime timeStamp;
private static AutoResetEvent semaphore = new AutoResetEvent(false);
private static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
static string un = ConfigurationManager.AppSettings["AMQUserName"];
static string pwd = ConfigurationManager.AppSettings["AMQPassword"];
static string url = ConfigurationManager.AppSettings["url"];
static string queue = ConfigurationManager.AppSettings["queue"];
private static string oldMsgId;
Program()
{
AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"].ToString());
System.Uri uri = new Uri(url);
IConnectionFactory factory = new ConnectionFactory(uri);
try
{
connection = factory.CreateConnection(un, pwd);
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
ActiveMQDestination dest = session.GetDestination(queue) as ActiveMQDestination;
consumer = session.CreateConsumer(dest) as MessageConsumer;
}
catch (NMSConnectionException ex)
{
Console.Write(ex.Message);
connection.Dispose();
}
try
{
connection.Start();
Console.WriteLine("Connection Started...");
Console.WriteLine("Session Created....");
}
catch (ConnectionFailedException ex)
{
connection.Close();
Console.Write(ex.Message);
}
}
~Program()
{
Dispose(false);
}
protected void Dispose(Boolean itIsSafeToAlsoFreeManagedObjects)
{
if (itIsSafeToAlsoFreeManagedObjects)
{
if (connection != null)
{
connection.Dispose();
}
if (session != null)
{
session.Dispose();
}
if (consumer != null)
{
consumer.Dispose();
}
}
}
public void Dispose()
{
Dispose(true);
}
static void ShutDown()
{
session.Close();
if (connection.IsStarted)
{
connection.Stop();
connection.Close();
connection.Dispose();
}
}
protected static void consumer_Listener(IMessage messasge)
{
messasge.Acknowledge();
msg = (ActiveMQMessage)messasge;
if (msg.MessageId.ToString() != oldMsgId)
{
oldMsgId = msg.MessageId.ToString();
msg.Acknowledge();
if (msg == null)
{
Console.WriteLine("No message received!");
}
else
{
Console.WriteLine("Received message with ID: " + msg.NMSMessageId);
Console.WriteLine("Received message with conetent: " + msg.ToString());
try
{
string s = ASCIIEncoding.ASCII.GetString(msg.Content);
timeStamp = DateTime.Now;
DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(I280Message));
var ms = new MemoryStream(msg.Content);
I280Message rows = (I280Message)deserializer.ReadObject(ms);
int MessageId = InsertPerson(rows.Person);
semaphore.Set();
}
catch (NMSException ex)
{
ShutDown();
Console.WriteLine(ex.Message);
}
}
}
else {
Console.WriteLine("Same old message....");
}
}
private static int InsertPerson(Person person)
{
using (SqlConnection sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"]))
{
using (SqlCommand sqlCmd = new SqlCommand("I280MessagePerson_tbl_isp", sqlConn))
{
sqlCmd.CommandType = CommandType.StoredProcedure;
sqlCmd.Parameters.AddWithValue("@BirthDate", person.BirthDate);
sqlCmd.Parameters.AddWithValue("@Gender", person.Gender);
sqlCmd.Parameters.AddWithValue("@VisaPermitType", person.VisaPermitType, null);
sqlCmd.Parameters.AddWithValue("@CitizenshipStatus", person.CitizenshipStatus, null);
sqlCmd.Parameters.AddWithValue("@ConfidentialFlag", person.ConfidentialFlag);
sqlCmd.Parameters.AddWithValue("@DeceasedFlag", person.DeceasedFlag, null);
sqlCmd.Parameters.AddWithValue("@TimeStamp", timeStamp);
SqlParameter paramPersonId = new SqlParameter("@MessageId", SqlDbType.Int);
paramPersonId.Direction = ParameterDirection.Output;
sqlCmd.Parameters.Add(paramPersonId);
sqlConn.Open();
try
{
sqlCmd.ExecuteNonQuery();
return (int)(sqlCmd.Parameters["@MessageId"].Value);
}
catch (SqlException ex)
{
Console.WriteLine(ex.Message);
if (sqlConn.State == ConnectionState.Open) sqlConn.Close();
return -1;
}
}
}
}
static void Main(string[] args)
{
using (Program pr = new Program())
{
consumer.Listener += new MessageListener(consumer_Listener);
}
//while (true)
//{
// consumer.Listener += new MessageListener(consumer_Listener);
// semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
//}
//catch (NMSException ex)
//{
// ShutDown();
// Console.WriteLine(ex.Message);
//}
// Console.ReadLine();
}
}
public static class SqlParameterCollectionExtensions
{
public static SqlParameter AddWithValue(this SqlParameterCollection target, string parameterName, object value, object nullValue)
{
if (value == null || (string)value == "")
{
return target.AddWithValue(parameterName, nullValue ?? DBNull.Value);
}
return target.AddWithValue(parameterName, value);
}
}
}