I am trying to process some tasks asynchronously using Rx, e.g.
var list = Enumerable.Range(0, 100)
.ToObservable()
.SelectMany(x => Observable.Start(() => {
Console.WriteLine("Processing {0} ...", x);
Thread.Sleep(100 * x % 3);
if (x > 90) {
Console.WriteLine("Procesing exception {0} > 90", x);
throw new Exception("Value too large");
}
Console.WriteLine("Processing {0} completed.", x);
return x;
}))
.Subscribe(
x => { Console.WriteLine("Next [{0}]", x); },
e => {
Console.WriteLine("Exception:");
Console.WriteLine(e.Message);
},
() => { Console.WriteLine("Complete"); }
);
The problem I have with this code is that the exception is not passed to the subscriber. So, after a lot of trying I gave up and decided to ask this simple question:
How do you handle the exceptions raised from within asynchronous methods within a SelectMany
statement?
Just to make it clear, the final implementation is a synchroneous function call that may or may not throw an exception. The goal is to pass it on to the subscriber so that it can be further processed (in the specific case a message will be shown to the user).
Edit
I moved my findings down to an answer, so that I can mark this question as answered. Personally, I do not agree with self answering ... but sometimes there is no other way, so sorry for it.