0

How can I handle the exception thrown while unsubscribing from message handler

var rawSource = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
            handler => ((sender, e) => handler(e)),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler += a ),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler -= a));
        return rawSource;

In this code sometimes I am getting exception thrown from MessageHandler as "Illegalstateexception : {"Consumer is closed"}"

vkg
  • 685
  • 1
  • 8
  • 21

2 Answers2

3

Events typically don't throw, so it's possibly the wrong behavior at the source. If you can fix it at the source, then do so.

Otherwise, you'll either have to catch and swallow the error:

a => this._topicSubscribers.ForEach(s => 
{
  try
  {
    s.MessageHandler += a;
  }
  catch
  {
  }
})

which perhaps isn't ideal, or just don't use the FromEvent method:

return Observable.Create<EventPattern<EMSMessageEventArgs>>(observer =>
{
  EMSMessageHandler handler = (sender, e) => 
    observer.OnNext(new EventPattern<EMSMessageEventArgs>(sender, e)));

  try
  {
    _topicSubscribers.ForEach(s => s.MessageHandler += handler);
  }
  catch (Exception ex)
  {
    try
    {
      _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
    }
    catch { }

    observer.OnError(ex);
  }

  return Disposable.Create(() =>
  {
    try
    {
      _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
    }
    catch { }
  });
});

Note that Rx requires serialized notifications (§4.2 in the Rx Design Guidelines) so you must ensure that all of the _topicSubscribers raise events sequentially, never concurrently. If you cannot, then you must synchronize all calls to observer.OnNext yourself, probably by acquiring a lock.

Update: To be clear, serialization is required regardless of whether you use FromEvent or Create, so even if you choose to simply swallow exceptions like my first example, you'll still need to make sure that the source never raises events concurrently; if you cannot, then you're forced to use my Create example with a lock anyway. FromEvent doesn't do this for you.

Dave Sexton
  • 2,562
  • 1
  • 17
  • 26
3

This use of FromEvent like this is asking for trouble for all the reasons Dave cited around serialization being required in Rx.

However, assuming events aren't raised concurrently within each event source (and I believe this is the case with an EMS MessageConsumer), I would just do your aggregation after FromEvent instead of within it, and let Rx do the heavy lifting:

var sources = new List<IObservable<EMSMessageEventArgs>();     

foreach(var topicSubscriber in this._topicSubscribers.ToList())
{
    var source = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
        handler => ((sender, e) => handler(e)),
        h => topicSubscriber.MessageHandler += h,
        h => topicSubscriber.MessageHandler -= h)
        .Synchronize();
}

rawSource = sources.Merge();

This way the Merge will take care of correctly aggregrating and serializing the individual sources - however, there could still be concurrency within the individual events. I actually don't think FromEvent is stressed by events being raised concurrently within an individual source. However, Merge may not be so tolerant, in which case the use of Sychronize() above ensures serialization at the individual event source level as well as across the event sources.

James World
  • 29,019
  • 9
  • 86
  • 120
  • Even in a just a single event registration it's the caller's responsibility to raise events serially. `FromEvent` doesn't serialize notifications itself. – Dave Sexton Nov 17 '14 at 15:07
  • No it doesn't, but it doesn't break either... at least I battered it and it didn't buckle (quite a few operators are actually tolerant of this). Hence I suspect use of `Synchronize` does address it I think. – James World Nov 17 '14 at 15:08
  • @DaveSexton Btw - did you know it's not required for .NET events to be raised serially (outside of Rx)? I looked into this extensively here: http://stackoverflow.com/questions/24572366/should-iobservable-be-preferred-over-events-when-exposing-notifications-in-a-lib/24576741#24576741 – James World Nov 17 '14 at 15:11
  • Yes, `Synchronize` is fine - that's what it's for. Didn't notice that you mentioned it. – Dave Sexton Nov 17 '14 at 15:12
  • Yes, I've used concurrent events before. .NET doesn't care :) You just have to copy the delegate first (the standard "raise" pattern.) – Dave Sexton Nov 17 '14 at 15:13
  • +1 though `Merge` doesn't seem to address the question about errors though. I think `Create` is the best solution here because no matter how you cut it, `FromEvent` just doesn't handle += and -= exceptions. – Dave Sexton Nov 17 '14 at 15:20
  • 2
    Frankly, if `+=` or `-=` does throw an exception, the bloody world should come tumbling down around your ears! I don't want that error handled! :) – James World Nov 17 '14 at 15:23
  • ... although obviously OP may not be in control here. I'm remembering how much I dislike the JMS standard, and especially Tibco's implementation of it for EMS's .NET API. – James World Nov 17 '14 at 15:25
  • I was kind of making a leap that the error was due to weirdness in the unsubscription behaviour with OP's setup of `FromEvent` - but actually it is probably just EMS being rubbish, or someone is disposing the `MessageConsumer` before unsubscribing the Rx (more likely) – James World Nov 17 '14 at 15:29
  • Agreed, but I think the error should be passed to the observer. The observer can ignore it by default, which will take down the app. But it's also possible that the observer has state which indicates that the event's object was already disposed, in which case it knows to just swallow the exception. – Dave Sexton Nov 17 '14 at 15:43
  • Or even the type of exception might just be `ObjectDisposedException` in some cases :-) – Dave Sexton Nov 17 '14 at 15:44
  • 1
    Specifically, I'm thinking of networking scenarios where race conditions are just unavoidable. Sometimes you don't want the app crashing because of an `ObjectDisposedException`. Instead, you just transition the UI state to "disconnected" or "can't connect". – Dave Sexton Nov 17 '14 at 15:48