4

Following code snippet has my connection and subscription logic for an IBM MQ Queue. When ever there is a connection failure, I am using IConnection.ExceptionListener delegate to establish a new connection to by queue and resubscribing for the messages. But the problem is, I can see multiple queue handles. How can I make sure I close the previous connection handle and establish a new connection where ever there is a connection break due to network issues or MQ server restarts?

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;

private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    //Connection
    _connection = _connectionfactory.CreateConnection (null, null);
    _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

    //Session
    _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

    //Destination
    _destination = _session.CreateQueue ("queue://My.Queue.Name");
    _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
    _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

    //Consumer
    _consumer = _session.CreateConsumer (_destination);
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            //Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
            //_conection.Stop()
            //_conection.Close()
            CreateWebsphereQueueConnection ();
            Subscribe (onMessageReceived);
        };

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}
PushCode
  • 1,419
  • 3
  • 15
  • 31

3 Answers3

2

You have already set reconnection option on the connection factory. XMS library will do the reconnection automatically when the connection to queue manager breaks, excepting when the queue manager is shutdown without -r or -s option. So your application does not need to reconnect explicitly. Having exception listener will help in understanding what's going with reconnection process.

Shashi
  • 14,980
  • 2
  • 33
  • 52
  • @JoshMc will XMS library takes care of reconnecting even when the queue manager is stopped with out the option to **Instruct reconnectable clients to reconnect**? – PushCode Jan 29 '18 at 13:36
  • 1
    No, XMS or any other MQ clients will not reconnect without the option to Instruct reconnectable clients to reconnect. That option is equivalent of -r option of endmqm command – Shashi Jan 29 '18 at 13:57
  • I think I am seeing issues with this kind of restarts in our production and I am trying to solve this my connection exception listener. Is there any way to check the state of the connection and session before I create a new connection in my connection exception handler? I can't find such property on IConnection objet and when I do IConnection.Stop() or IConnection.Close(), code completely breaks out of execution. – PushCode Jan 29 '18 at 14:28
  • There is no harm in having the MQ Admins default to using the `-r` flag in any start/stop scripts they use, ask them to make this the default. – JoshMc Jan 30 '18 at 14:48
1

IBM.XMS.dll will take care of MQ fail over or restarts done with -r switch. But, if there was a restart wit out asking the connected clients to reconnect, XMS library will not attempt to reconnect and the costumers will have to handle this situation manually as pointed out by @Shashi and @JoshMc.

I had to handle this situation and changing my Connection ExceptionListener as follows helped me:

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private bool _reConnectOnConnectionBreak = false;
private bool _connected = false;
private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    while (!_connected || _reConnectOnConnectionBreak) {
        try {
            //Connection
            _connection = _connectionfactory.CreateConnection (null, null);
            _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

            //Session
            _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

            //Destination
            _destination = _session.CreateQueue ("queue://My.Queue.Name");
            _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
            _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

            //Consumer
            _consumer = _session.CreateConsumer (_destination);
            _connected = true;
        } catch (Exception ex) {
            _connected = false;
        }

    }
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            XMSException xmsError = (XMSException) connectionException;
            int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
            if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
                _reConnectOnConnectionBreak = true;
                _connection.Close ();

                CreateWebsphereQueueConnection ();
                Subscribe (onMessageReceived);
                _reConnectOnConnectionBreak = false;
            }
        }

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}

There is no better way to check the state of the connection IConnection in IBM MQ version 8. So, I had to use thereason codes. In IBM MQ version 9, we can use the rest API exposed by the server to check the connection state.

PushCode
  • 1,419
  • 3
  • 15
  • 31
  • If your MQ team has standard stop scripts for MQ you could also ask them to put the `-r` in the standard stop scripts so that it is always used. – JoshMc Feb 05 '18 at 17:40
  • 1
    I think they do and I informed them again. This code is now taking care of the connection breaks even someone missed the -r switch – PushCode Feb 05 '18 at 17:54
  • You should accept your own answer if this is what resolved your issue. – JoshMc Feb 05 '18 at 18:57
  • Why is your code not complete? Where are the classes, where is your exception callback method, where are the virtual methods for the overrides you've put? I don't understand your code – Ozkan May 24 '18 at 14:22
  • Have done something fundamentally similar to handle this re-connectivity situation. "There is no better way to check the state of the connection IConnection in IBM MQ version 8" - still in 2022 with MQ Version 9.2.4 in .NET - Works perfectly. – hB0 Mar 16 '22 at 12:38
0

For my service I combined CreateWebsphereQueueConnection() and Subscribe<T>() into a single Connect() method.

With:

connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, XMSC.WMQ_CLIENT_RECONNECT);                           
connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, 3600);                          
connectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);

var queueConnection = connectionFactory.CreateConnection();
queueConnection.ExceptionListener = OnException;

Then I handle an exception like this:

private void OnException(Exception exception)
{
    Policy.Handle<Exception>()
        .WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(5), (ex, timespan) =>
        {
            _logger.Warning($"Unable to connect: {ex.Message}.");
        })
        .Execute(CreateWebsphereQueueConnection);
}

It's important to retry, since you don't know exactly how long it'll take before you can reconnect.

Nic
  • 12,220
  • 20
  • 77
  • 105
  • 2
    By setting `XMSC.WMQ_CLIENT_RECONNECT_OPTIONS` to `XMSC.WMQ_CLIENT_RECONNECT` and `XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT` to `3600` you are telling the .NET app to attempt to reconnect for 3600 seconds (1 hour). This will happen if the client looses the connect because of the queue manager crashing or a communication error of some sort, or if it is ended with the `-r` or `-s` flag. The exception listener is meant to let the app know the reconnect logic is happening. Based on what you have presented won't your initial session reconnect and you will start a new session? – JoshMc Jan 29 '18 at 05:29
  • 1
    In reviewing the doc it suggested something like this `queueConnection.ExceptionListener = new ExceptionListener(OnException);` similar to what @PushCode has presented. It goes on to state "You can remove the delegate by resetting the ExceptionListener to null: `connection.ExceptionListener = null;`". Is it possible the way you have it that `= OnException` is treated as `null` and so is really not setting the exception listener and instead just MQ's reconnect logic is keeping you connected? – JoshMc Jan 29 '18 at 05:51
  • Interesting points - Initially I had the exception handler just log the error, and was relying on MQ's reconnect logic to reconnect. However, I noticed it often wasn't reconnecting. Now that I'm forcing a reconnect in my code it has been working fine. It looks like you know a lot about IBM MQ, so feel free to post your own if my answer isn't good enough. :) – Nic Jan 29 '18 at 06:59
  • Where the times it was not reconnecting when MQ had been down for more that 1 hour? When it does reconnect in less than 1 hour do you see two different connections? (1 from MQ reconnecting the original session and one from your explicit connection) – JoshMc Jan 29 '18 at 08:34