4

My understanding is that async void, should be avoided and that async () => is just async void in disguise when used with Action.

Hence, using the Rx.NET Finally operator asynchronously with async () => should be avoided since Finally accepts Action as parameter:

IObservable<T>.Finally(async () =>
{
    await SomeCleanUpCodeAsync();
};

However, if this is bad practise, what is then best practice to use in the case where I for instance need to asynchronously close a network connection on OnCompleted or if my observable end with OnError?

1iveowl
  • 1,622
  • 1
  • 18
  • 31
  • 2
    The async lambdas you've shown here are `async void` methods, but not all async lambdas are going to be `void` methods. If you provide an async lambda in a context where a `Task` returning method is expected, that's what you'll get. – Servy Jul 20 '17 at 14:32

4 Answers4

3

My understanding is that async void, should be avoided and that async () => is just async void in disguise.

This is partially wrong. async () => can either match Func<Task> (good) or Action (bad). The main reason for good/bad is that an exception that occurs in a async void call crashes the process, whereas a async Task exception is catchable.

So we just need to write an AsyncFinally operator that takes in a Func<Task> instead of an Action like Observable.Finally:

public static class X
{
    public static IObservable<T> AsyncFinally<T>(this IObservable<T> source, Func<Task> action)
    {
        return source
            .Materialize()
            .SelectMany(async n =>
            {
                switch (n.Kind)
                {
                    case NotificationKind.OnCompleted:
                    case NotificationKind.OnError:
                        await action();
                        return n;
                    case NotificationKind.OnNext:
                        return n;
                    default:
                        throw new NotImplementedException();
                }
            })
            .Dematerialize()
        ;
    }
}

And here's a demonstration of usage:

try
{
    Observable.Interval(TimeSpan.FromMilliseconds(100))
        .Take(10)
        .AsyncFinally(async () =>
        {
            await Task.Delay(1000);
            throw new NotImplementedException();
        })
        .Subscribe(i => Console.WriteLine(i));
}
catch(Exception e)
{
    Console.WriteLine("Exception caught, no problem");
}

If you swap out AsyncFinally for Finally, you'll crash the process.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • I really like this approach. I was not familiar with Materialize/Dematerialize. Nice way of using them. But why use SelectMany and not just Select? – 1iveowl Jul 21 '17 at 21:32
  • 2
    Select won't work there. The async selector function with select would turn it from `IObservable>` into `IObservable>`. The SelectMany version flattens out the Task part of that. – Shlomo Jul 21 '17 at 22:45
  • 2
    If I Dispose of the subscription OnCompleted or OnError will not occure and hence the SomeCleanUpCodeAsync will not run. I thought it would. Am I doing something wrong or is there a work-around? – 1iveowl Jul 24 '17 at 08:44
  • 1
    No, that is intended functionality. Workaround would be explicitly calling the clean up when you dispose the subscription. Alternatively, you could use Observable.Using. – Shlomo Jul 24 '17 at 11:13
  • 1
    Ok, but how then do I run the clean up as async? `Disposable.Create` is like Finally in the the sense that it expects an `Action` rather than a Func. Is there a way to create a Disposable.AsyncCreate that will call DisposeAsync, or something like this. Maybe I should create a seperate question for this? – 1iveowl Jul 24 '17 at 13:11
  • I would create a separate question. – Shlomo Jul 24 '17 at 13:26
  • You are right: https://stackoverflow.com/questions/45286998/async-disposable-create – 1iveowl Jul 24 '17 at 17:52
  • Noted that this solution doesn't work on `subscription.Dispose()`. the action in `AsynFinally` will be missed if you dispose the observable before completed. – shtse8 Sep 08 '21 at 11:49
2

It is in Rx as it is elsewhere; avoid async void like the plague. In addition to the problems listed in the article, using asynchronous code in the synchronous operators "breaks" Rx.

I'd consider using OnErrorResumeNext() for cleaning up resources asynchronously. OnErrorResumeNext() let's you specify an observable which will run after the first, regardless the reason it ended:

var myObservable = ...

myObservable
    .Subscribe( /* Business as usual */ );

Observable.OnErrorResumeNext(
        myObservable.Select(_ => Unit.Default),
        Observable.FromAsync(() => SomeCleanUpCodeAsync()))
    .Subscribe();

myObservable would preferably be a ConnectableObservable (e.g. Publish()) to prevent multiple subscriptions.

Jon G Stødle
  • 3,844
  • 1
  • 16
  • 22
  • This takes care of the OnError scenario. However, I also need to clean up after OnCompleted. However, interesting that you are using Observable.FromAsync(). Is this the way to work around `async void`? – 1iveowl Jul 20 '17 at 09:20
  • 2
    Contrary to what the name might imply `OnErrorResumeNext()` actually concats the second observable to the first one regardless whether the first observable errors. `OnErrorResumeNext()` behaves much like `Concat()`, but moves to the next observable on both *OnError* **and** *OnCompleted* – Jon G Stødle Jul 20 '17 at 09:47
  • Thank you for that clarification. Will look into it. – 1iveowl Jul 20 '17 at 11:22
  • 1
    `Finally` is actually the better semantic choice in this scenario. – Asti Jul 20 '17 at 11:37
  • Hi @Asti! The `Finally` invokes an `Action`, while the `OnErrorResumeNext` takes an observable as continuation. So the later is probably preferable as a substitute of `async using`. – Theodor Zoulias Nov 20 '20 at 20:59
2

The method signature for Finally is

public static IObservable<TSource> Finally<TSource>(
    this IObservable<TSource> source,
    Action finallyAction
)

which expects an action, not a Task.

As an addendum, if you want to run something asynchronously, instead of async void, use Task.Factory methods inside the method so the intention is explicit.

Asti
  • 12,447
  • 29
  • 38
2

Quoting from the Intro to Rx:

The Finally extension method accepts an Action as a parameter. This Action will be invoked if the sequence terminates normally or erroneously, or if the subscription is disposed of.

(emphasis added)

This behavior cannot be replicated by a Finally operator that accepts a Func<Task> parameter, because of how the IObservable<T> interface is defined. Unsubscribing from an observable sequence is achieved by calling the Dispose method of the IDisposable subscription. This method is synchronous. And the whole Rx library is built on top of this interface. So even if you create an extension method DisposeAsync for IDisposables, the built-in Rx operators (for example Select, SelectMany, Where, Take etc) will be unaware of its existence, and will not invoke it when they unsubscribe from their source sequence. A subscription chain of operators will be automatically unlinked by calling the synchronous Dispose method of the previous link as always.

Btw there has been an attempt to implement an asynchronous version of Rx (AsyncRx), that is built on top of the completely new interfaces that are shown below. This library has not been released yet.

public interface IAsyncObserver<in T>
{
    ValueTask OnNextAsync(T value);
    ValueTask OnErrorAsync(Exception error);
    ValueTask OnCompletedAsync();
}

public interface IAsyncObservable<out T>
{
    ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer);
}

public interface IAsyncDisposable
{
    public ValueTask DisposeAsync();
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104