6

I am trying to process some tasks asynchronously using Rx, e.g.

var list = Enumerable.Range(0, 100)
    .ToObservable()
    .SelectMany(x => Observable.Start(() => {
        Console.WriteLine("Processing {0} ...", x);

        Thread.Sleep(100 * x % 3);

        if (x > 90) {
            Console.WriteLine("Procesing exception {0} > 90", x);
            throw new Exception("Value too large");
        }
        Console.WriteLine("Processing {0} completed.", x);
        return x;
    }))
    .Subscribe(
        x => { Console.WriteLine("Next [{0}]", x); },
        e => {
            Console.WriteLine("Exception:");
            Console.WriteLine(e.Message);
        },
        () => { Console.WriteLine("Complete"); }
    );

The problem I have with this code is that the exception is not passed to the subscriber. So, after a lot of trying I gave up and decided to ask this simple question:

How do you handle the exceptions raised from within asynchronous methods within a SelectMany statement?

Just to make it clear, the final implementation is a synchroneous function call that may or may not throw an exception. The goal is to pass it on to the subscriber so that it can be further processed (in the specific case a message will be shown to the user).

Edit

I moved my findings down to an answer, so that I can mark this question as answered. Personally, I do not agree with self answering ... but sometimes there is no other way, so sorry for it.

AxelEckenberger
  • 16,628
  • 3
  • 48
  • 70
  • 1
    Does this help answer your question? http://stackoverflow.com/questions/7210051/catching-exceptions-which-may-be-thrown-from-a-subscription-onnext-action – user981225 Mar 05 '12 at 22:46
  • Not exactly as this supresses the exception, however, the wapping idea might be usefull, if nothing better comes in. However, I am not sure whether the wrapping will work in my scenario as I am dealing with multiple asynchroneous and parallel calls ... But I will investigate, Thanks. – AxelEckenberger Mar 05 '12 at 23:16
  • @user981225, thanks proven valuable but the answer is quite simple, see edit. – AxelEckenberger Mar 06 '12 at 11:28
  • Where can you get the implementation source? – Matthew Finlay Mar 06 '12 at 21:48
  • Use a reflection tool like [Reflector](http://www.reflector.net/) (commercial) or [justDecompile](http://www.telerik.com/products/decompiler.aspx) (free, closed source), or even [more alternatives](http://stackoverflow.com/q/2425973/266919). – AxelEckenberger Mar 07 '12 at 06:13

2 Answers2

3

Use Materialize to convert your OnError / OnCompleted messages into notifications.

For example,

observable.SelectMany(x => Observable.Start(fn).Materialize())

will get you the error / completion wrapped in a notification to be handled in your actual subscription point way down the line, as opposed to the error being terminated inside the SelectMany.

This is useful for most Async call operations because the method either fails or completes.

Asti
  • 12,447
  • 29
  • 38
  • +1 Materialize could be interesting if you have a cascade of operations and want to push the notification through to a common error handler. Nice option, however as stated ... sadly the problem sat in front of the computer, not being able to use the tools right ... :-) – AxelEckenberger Mar 08 '12 at 12:39
2

The answer

Actually the code is working correctly. However, the debugger breaks at the exceptions as the async operations are still executed in the background - well at least those that were already started when the first exception occurred. Threw me! If you run the code without debugger the exceptions are swallowed.So I guess the problem was really in front of the computer :-)

Still some clarifications on the Observable.Start as I assumed - an this correctly - that the implemtation should have actually some error handling implemented ... see Background.

Background

Observable.Start is a convenience method that uses the Observable.ToAsync method to turn a function/acion into an async operation. If you look at the implementation of the method you'll see that it already does the exception handling/forwarding.

public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
    if (function != null) {
        if (scheduler != null) {
            return () => {
                AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
                scheduler.Schedule(() => {
                    TResult result = default(TResult);
                    try {
                        result = function();
                    } catch (Exception exception1) {
                        Exception exception = exception1;
                        asyncSubject.OnError(exception);
                        return;
                    }
                    asyncSubject.OnNext(result);
                    asyncSubject.OnCompleted();
                });
                return asyncSubject.AsObservable<TResult>();
            };
        } else {
            throw new ArgumentNullException("scheduler");
        }
    } else {
        throw new ArgumentNullException("function");
    }
}
AxelEckenberger
  • 16,628
  • 3
  • 48
  • 70