0

I'm new to Polly and I'm trying to apply the Retry policy, so that I can have it manually handling the retry connection in case of IBMMQ connection issue.

Please, consider the following code:

 public class ReconnectException : Exception
{

}

public class QueueMonitor : IObservable<Message>, IDisposable
{
    private readonly MQQueue mqQueue;
    private readonly MQQueueManager queueManager;
    private readonly string queueName;
    private IDisposable timer;
    private readonly object lockObj = new object();
    private bool isChecking;
    private readonly TimeSpan checkingFrequency;
    private readonly List<IObserver<Message>> observers;
    private TimeSpan reconnectInterval;

    private readonly IScheduler scheduler;

    private readonly int maxReconnectCount;

    private static readonly ILog Logger = LogProvider.For<AonQueueManager>();


    private readonly Policy pollyPolicy;

    public QueueMonitor(IConfiguration configuration, string queueName, IScheduler scheduler = null)
    {
        this.queueManager = QueueFactory.GetIstance(configuration);
        this.queueName = queueName;
        this.scheduler = scheduler ?? Scheduler.Default;
        checkingFrequency = configuration.GetValue("checkingFrequency", new TimeSpan(0, 0, 5));
        reconnectInterval = configuration.GetValue("reconnectInterval", new TimeSpan(0, 0, 5));
        maxReconnectCount = configuration.GetValue("maxReconnectCount", 3);
        observers = new List<IObserver<Message>>();

        pollyPolicy = Policy.Handle<ReconnectException>().WaitAndRetry(maxReconnectCount, _ => TimeSpan.FromSeconds(2));

        mqQueue = queueManager.AccessQueue(queueName,
            MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
            + MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping

    }

    public void Start()
    {
        var x = pollyPolicy.ExecuteAndCapture(CreateTimer);
    }

    private void CreateTimer()
    {

        Logger.DebugFormat("Repeating timer started, checking frequency: {checkingFrequency}", checkingFrequency);
        timer = Observable.Interval(checkingFrequency, scheduler).Subscribe(_ =>
 {
   lock (lockObj)
   {
     if (isChecking) return;

     Logger.Log(LogLevel.Debug, () => "Listening on queues for new messages");
     isChecking = true;

     var mqMsg = new MQMessage();
     var mqGetMsgOpts = new MQGetMessageOptions { WaitInterval = checkingFrequency.Milliseconds };

     // 15 second limit for waiting
     mqGetMsgOpts.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING |
                  MQC.MQCNO_RECONNECT_Q_MGR | MQC.MQOO_INPUT_AS_Q_DEF;
     try
     {
         mqQueue.Get(mqMsg, mqGetMsgOpts);
         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
         {
             var text = mqMsg.ReadString(mqMsg.MessageLength);

             Logger.Debug($"Message received : [{text}]");

             Message message = new Message { Content = text };
             foreach (var observer in observers)
                 observer.OnNext(message);
         }
         else
         {
             Logger.Warn("Non-text message");
         }
     }
     catch (MQException ex)
     {
         if (ex.Message == MQC.MQRC_NO_MSG_AVAILABLE.ToString())
         {
             Logger.Trace("No messages available");
             //nothing to do, emtpy queue
         }
         else if (ex.Message == MQC.MQRC_CONNECTION_BROKEN.ToString())
         {
             Logger.ErrorException("MQ Exception, trying to recconect", ex);

             throw new ReconnectException();
         }
     }
     finally
     {
         isChecking = false;
     }
 }
});
    }


    public IDisposable Subscribe(IObserver<Message> observer)
    {
        if (!observers.Contains(observer))
            observers.Add(observer);

        return new Unsubscriber(observers, observer);
    }

    public void Dispose()
    {
        ((IDisposable)mqQueue)?.Dispose();
        ((IDisposable)queueManager)?.Dispose();

        timer?.Dispose();
    }
}

public class Unsubscriber : IDisposable
{
    private readonly List<IObserver<Message>> observers;
    private readonly IObserver<Message> observer;

    public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
    {
        this.observers = observers;
        this.observer = observer;
    }

    public void Dispose()
    {
        if (observer != null) observers.Remove(observer);
    }
}

The problem I've is that when an exception is thrown inside the lamda ( throw new ReconnectException();), Polly doesn't catch it (and I understand why, since it's on another thread) and the application quits since it's on a different thread.

This code is a part of a library,so I don't know that if in every project the Global exceptions are correctly handed.

How do I get it "catched" by the Polly's code?

Thanks in advance

advapi
  • 3,661
  • 4
  • 38
  • 73
  • If you are wanting to use Polly to catch exceptions thrown outwards from the delegate scheduled by the Rx timer, then Polly cannot do that with the approach shown afaik - Rx schedules that work on a completely separate thread (as you say) at a later time. Governing Rx's handling of erroring observables is probably best done with Rx's in-built error handling constructs, see eg https://stackoverflow.com/questions/20189166/rx-back-off-and-retry . It might be possible to write a custom Rx scheduler to encapsulate Polly and bring Polly into it, but I would try Rx's in built facilities first. – mountain traveller Sep 08 '19 at 09:10

1 Answers1

1

The code posted in the question applies the policy only to the act of creating the timer (the execution of CreateTimer()), not to the code executed by the timer (the lambda inside the .(Subscribe(_ => { }) call).

This is the same as the behaviour if the call to CreateTimer() was surrounded by a try { } catch { }. The catch would only cover the act of executing the CreateTimer() method, the creation of the timer.


For the Polly policy to govern exceptions thrown within the lambda, it needs to be applied within the lambda, to the relevant block/group of statements which are expected to throw the exception.

For example, you might code:

pollyPolicy.ExecuteAndCapture(() => mqQueue.Get(mqMsg, mqGetMsgOpts));

(with a policy configured to govern the particular MQException/s you want to handle).

Or you can apply the policy to a wider group of statements - just as with a try { } clause.

pollyPolicy.ExecuteAndCapture(() => 
{
    // ...
    mqQueue.Get(mqMsg, mqGetMsgOpts));
    // ...
}
mountain traveller
  • 7,591
  • 33
  • 38