1

Overview

I am attempting to write an IAsyncEnumerable<T> wrapper around an IObserver<T> interface. At first I used a BufferBlock<T> as the backing data store, but I found out through performance testing and research that it is actually a pretty slow type, so I decided to give the System.Threading.Channels.Channel type a go. I had a similar problem with my BufferBlock implementation as this one but this time I'm not sure how to resolve it.

Problem

My GetAsyncEnumerator() loop gets blocked by the await _channel.Reader.WaitToRead(token) call if my IObserver<T>.OnNext() method hasn't written to the _channel yet. What is the correct way to wait for a value to be available to yield in this context without blocking program execution?

Implementation

public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>,
    IObserver<T>, IDisposable
{
    private readonly IDisposable _unsubscriber;
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    private bool _producerComplete;

    public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public async void OnNext(T value)
    {
        Log.Logger.Verbose("Adding value to Channel.");
        await _channel.Writer.WriteAsync(value);
    }

    public void OnError(Exception error)
    {
        _channel.Writer.Complete(error);
    }

    public void OnCompleted()
    {
        _producerComplete = true;
    }

    public async IAsyncEnumerator<T> GetAsyncEnumerator(
        [EnumeratorCancellation] CancellationToken token = new CancellationToken())
    {
        Log.Logger.Verbose("Starting async iteration...");
        while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
        {
            Log.Logger.Verbose("Reading...");
            while (_channel.Reader.TryRead(out var item))
            {
                Log.Logger.Verbose("Yielding item.");
                yield return item;
            }
            Log.Logger.Verbose("Awaiting more items.");
        }
        Log.Logger.Verbose("Iteration Complete.");
        _channel.Writer.Complete();
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
        _unsubscriber?.Dispose();
    }
}

Screenshot

Additional Context

It shouldn't matter, but at runtime the IObservable<T> instance passed into the constructor is a CimAsyncResult returned from async calls made to the Microsoft.Management.Infrastructure apis. Those make use of the Observer design pattern which I'm trying to wrap with the fancy new async enumeration pattern.

Edit

Updated with logging to the debugger output and made my OnNext() method async/await as one commenter suggested. You can see it never enters the while() loop.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
MplsAmigo
  • 985
  • 6
  • 21
  • Subtle comment: You never `await` or examine the response from `_channel.Writer.WriteAsync(value);` – Andy Aug 03 '20 at 18:16
  • I updated the OnNext() method as you suggested but that didn't see to have an effect. – MplsAmigo Aug 03 '20 at 18:31
  • I should mention, I don't believe that OnNext() will ever be executed asynchronously in any case because the IObervable that calls my OnNext method is also not async and is a 3rd party library type. That being said I don't think that should effect the behavior of my enumerator. – MplsAmigo Aug 03 '20 at 18:45
  • i didn't think it would help.. just a subtle reminder that you would fire off a call to `WriteAsync()`, and if it threw an exception, you'd never know. – Andy Aug 03 '20 at 18:47
  • Fair enough, thanks. – MplsAmigo Aug 03 '20 at 18:47
  • It seems that your program is exiting early. Are you sure that you don't have any `async void` method up the stack? Btw there is a `ToAsyncEnumerable` extension method for `IObservable`s in the [`System.Interactive.Async`](https://www.nuget.org/packages/System.Interactive.Async/) library. I don't know about its performance characteristics, but at least it works correctly. Your own implementation looks incorrect (with more than one consumers, each consumer will get a random subset of the items). – Theodor Zoulias Aug 03 '20 at 18:49
  • I was not aware of that library/extension method. Ill see if that does what I need. – MplsAmigo Aug 03 '20 at 18:56
  • You may find this question interesting: [Factory for IAsyncEnumerable or IAsyncEnumerator](https://stackoverflow.com/questions/61540896/factory-for-iasyncenumerable-or-iasyncenumerator). It is not related to `IObservable`s, but contains `IAsyncEnumerable` implementations that are based on channels. – Theodor Zoulias Aug 03 '20 at 19:10
  • 2
    Okay. So I figured out why my program was blocking execution. Further up the call stack I was calling the async method syncronously via the `GetAwaiter().GetResult()` methods. I figured out that even using the `ToAsyncEnumerable()` extension method from Reactive, I was getting the same result which was that the program was getting blocked. I did this because in once case I wanted to get the data from within a constructor. I changed that implementation to execute the call using Task.Run() and now the iterators run flawlessly with both implementations. – MplsAmigo Aug 03 '20 at 19:20
  • 1
    I looked at the `ToAsyncEnumerable` implementation from Reactive which is built out substantially more than my implementation. It used the `ConcurrentQueue<>` as its datasource. I don't know how the performance profile compares to a `Channel` but I thought that was interesting to note. – MplsAmigo Aug 03 '20 at 19:23
  • Additional testing reveals that either implementation doesn't allow my view to load data asynchronously. This seems to be caused by the await keywords used within the iterator, waiting for results to be added to the the _channel. I've come up with a new implementation that allows results to stream to my view unhindered. – MplsAmigo Aug 03 '20 at 20:01
  • FYI the `System.Interactive.Async` library is a by-product of the `System.Reactive` project, which AFAIK is not implemented with performance as a priority. It has feature-richness and correctness as priorities instead. If performance is paramount for you, the `Channels` are hard to beat. Also keep in mind Marc Gravell's [PooledAwait](https://mgravell.github.io/PooledAwait/) library, for reducing allocations in async methods. – Theodor Zoulias Aug 03 '20 at 20:13

1 Answers1

2

Further up the call stack I was calling the async method syncronously via the GetAwaiter().GetResult() methods.

Yup, that's a problem.

I did this because in once case I wanted to get the data from within a constructor. I changed that implementation to execute the call using Task.Run() and now the iterators run flawlessly with both implementations.

There are better solutions than blocking on asynchronous code. Using Task.Run is one way to avoid the deadlock, but you still end up with a sub-par user experience (I'm assuming yours is a UI application, since there is a SynchronizationContext).

If the asynchronous enumerator is used to load data for display, then a more proper solution is to (synchronously) initialize the UI to a "Loading..." state, and then update that state as the data is loaded asynchronously. If the asynchronous enumerator is used for something else, you may find some appropriate alternative patterns in my async constructors blog post.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 3
    Is there an achievement for ‘Stephen Cleary answered your question’? :D Thanks for the link to your blog post, I’ll definitely read it. Like I mentioned in one of my comments, my initial implementation of the ‘GetAsyncEnumerator’ didn’t have an await keyword inside the while loop. Instead it awaited a ‘SemaphoreSlim.WaitAsync()’ configured to only allow access to the method one at a time. When I changed the implementation to what is in my question, executing it on the main thread caused it to halt and not allow my producer to insert any items. – MplsAmigo Aug 04 '20 at 02:21