4

I want to call an async function for each item in an observable. As answered here, the solution is to use SelectMany. However, if the async method throws, the subscription will terminate. I have the following solution, which seems to work:

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .Catch(Observable.Empty<string>()));

Is there a more idiomatic solution?

Community
  • 1
  • 1
Victor Grigoriu
  • 417
  • 5
  • 18

2 Answers2

2

There is a standard way to be able to observe the exceptions that occur in your RunAsync call, and that's using .Materialize().

The .Materialize() method turns an IObservable<T> sequence into a IObservable<Notification<T>> sequence where you can reason against the OnNext, OnError, and OnCompleted calls.

I wrote this query:

var obs = Observable.Range(0, 10);

obs
    .SelectMany(x =>
        Observable
            .FromAsync(() => RunAsync())
            .Materialize())
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
    .Subscribe(x => x.Dump());

With this supporting code:

private int counter = 0;
private Random rnd = new Random();

private System.Threading.Tasks.Task<string> RunAsync()
{
    return System.Threading.Tasks.Task.Factory.StartNew(() =>
    {
        System.Threading.Interlocked.Increment(ref counter);
        if (rnd.NextDouble() < 0.3)
        {
            throw new Exception(counter.ToString());
        }
        return counter.ToString();
    });
}

When I run it I get this kind of output:

2
4
5
1!
6
7
3!
10
8!
9

Each of the lines ending in ! are calls to RunAsync that resulted in an exception.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
0

You can also use OnErrorResumeNext.

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .OnErrorResumeNext(Observable.Empty<string>()));
treze
  • 3,159
  • 20
  • 21
  • Right, but Catch gives me the opportunity to log the exception. – Victor Grigoriu Apr 03 '15 at 10:22
  • That's true, but could also be achieved with a Do(). Anyway, catch is probably the better choice, because OnErrorResumeNext executes the empty sequence also in case of successful termination. – treze Apr 03 '15 at 10:47