1

Is it possible to use the Using operator in Rx.Net with a resource that implements IAsyncDisposable rather than IDisposable? If not, is there some sort of workaround that I could use?

jack
  • 147
  • 1
  • 7

1 Answers1

3

Here is a Using method that works with IAsyncDisposable objects:

/// <summary>
/// Constructs an observable sequence that depends on a resource object,
/// whose lifetime is tied to the resulting observable sequence's lifetime.
/// </summary>
public static IObservable<TResult> Using<TResult, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IObservable<TResult>> observableFactory)
    where TResource : IAsyncDisposable
{
    return Observable.Defer(() =>
    {
        TResource resource = resourceFactory();
        IObservable<TResult> observable;
        try { observable = observableFactory(resource); }
        catch (Exception ex) { observable = Observable.Throw<TResult>(ex); }

        Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
        IObservable<TResult> disposer = Observable
            .FromAsync(() => lazyDisposeTask.Value)
            .Select(_ => default(TResult))
            .IgnoreElements();

        return observable
            .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
            .Concat(disposer)
            .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
    });
}

This method has identical signature with the Rx Observable.Using method (apart from the where clause), and it can be used in the same way.

This implementation takes care of all completion cases:

  1. Successful completion: The IAsyncDisposable resource is disposed asynchronously by the Concat operator.
  2. Completion with error: The IAsyncDisposable resource is disposed asynchronously by the Catch operator.
  3. The sequence is unsubscribed before its completion: The IAsyncDisposable resource is disposed synchronously by the Finally operator. Disposing asynchronously the resource is not possible in this case, for reasons explained here.

Variant with asynchronous factory methods:

public static IObservable<TResult> Using<TResult, TResource>(
    Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
    Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
    where TResource : IAsyncDisposable
{
    return Observable.Create<TResult>(async (observer, cancellationToken) =>
    {
        TResource resource = await resourceFactoryAsync(cancellationToken);
        IObservable<TResult> observable;
        try { observable = await observableFactoryAsync(resource, cancellationToken); }
        catch { await resource.DisposeAsync(); throw; }

        Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
        IObservable<TResult> disposer = Observable
            .FromAsync(() => lazyDisposeTask.Value)
            .Select(_ => default(TResult))
            .IgnoreElements();

        return observable
            .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
            .Concat(disposer)
            .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
            .Subscribe(observer);
    });
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you so much - this looks great. I know my original question didn't specify this, but is it easy to extend your code so that the the resource factory returns `Task`? – jack Nov 01 '21 at 18:41
  • 1
    @jackdry sure, I updated the answer. – Theodor Zoulias Nov 01 '21 at 18:52
  • 1
    Thanks a lot. Out of interest, is there a good reason for using `Lazy` rather than `Func` for "lazyDisposeTask"? – jack Nov 01 '21 at 18:59
  • 1
    @jackdry yes. The `Lazy` ensures that the `DisposeAsync` will be called only once. Most real-world disposables tolerate being disposed multiple times, but better be safe than sorry. :-) – Theodor Zoulias Nov 01 '21 at 19:25
  • 1
    @jackdry FYI I fixed a bug in the implementation of both `Using` methods. – Theodor Zoulias Nov 01 '21 at 20:15
  • Thank you. Just one more question (if okay!). Can you use `lazyDisposeTask.Value.Wait()` instead of`lazyDisposeTask.Value.GetAwaiter().GetResult()`? – jack Nov 01 '21 at 20:30
  • 2
    @jackdry - https://stackoverflow.com/questions/17284517/is-task-result-the-same-as-getawaiter-getresult – Enigmativity Nov 01 '21 at 20:51
  • @jackdry it should be noted that the built-in `Observable.Using` method can cause an unhandled exception in case the `Dispose` invocation fails, and the same is true for the `Using` methods above. A relevant GitHub issue can be found [here](https://github.com/dotnet/reactive/issues/1632 "Observable.Using does not propagate exceptions from resource .Dispose method to stream"). Currently I don't know of a perfect solution to this problem. – Theodor Zoulias Nov 02 '21 at 05:29