4

I am examining the effects of replacing some instances of regular C# event pattern with IAsyncEnumerable. This would be accomplished by lazy instantiation/activation of an IAsyncEnumerable and caching that reference for use by all callers/listeners. Some quick tests (see below) show that this works, but I haven't seen other examples online using IAsyncEnumerable in this fashion.

I realize this isn't exactly what IAsyncEnumerable was created for, and that most would advocate for ReactiveX (https://github.com/dotnet/reactive) in this case. However, I'd appreciate an analysis of why one would or would not want to do this as described (instead of just how to do this with Rx). I've provided a couple of examples below. My candidate event pattern replacement is one where it is more of an event stream (like deserialized messages being produced off a serial connection or UDP socket etc.)

Example 1:

  class Program
  {
    public static async Task Main( string[] args )
    {
      // Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
      var asyncEnumerable = GetNumbersAsync( 10 );

      // Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
      await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );

      Console.WriteLine( "DONE!");
    }
    
    private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
    {
      await foreach ( var n in numbers )
        Console.WriteLine( $"{id}: Processing {n}" );
    }

    private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
    {
      // This would really be async read operations from a remote source
      for ( var i = 0; i < maxNumber; i++ )
      {
        await Task.Delay( 100 );
        yield return i;
      }
    }
  }

This produces the output I would want as a user of this pattern:

1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!

The previous example is putting each consumer on a different thread but based on the context (perhaps WPF app) there could be multiple consumers on the same thread (not possible with IEnumerable but the door is opened with IAsyncEnumerable). The following is in a console app but one can imagine the producers and consumers being created on the UI thread of a WPF app.

Example 2:

  class Program
  {
    public static async Task Main( string[] args )
    {
      var producer = new Producer();
      var consumer1 = new Consumer( 1, producer );
      var consumer2 = new Consumer( 2, producer );
      var consumer3 = new Consumer( 3, producer );

      await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );

      Console.WriteLine( "DONE!");
    }

    // Singleton producer
    private interface IProducer
    {
      IAsyncEnumerable<int> GetMessagesAsync();
    }

    // Transient consumer
    private interface IConsumer
    {
      Task ConsumeMessagesAsync();
    }

    private class Producer : IProducer
    {
      private const int _maxFakeMessages = 10;
      private readonly object _mutex = new Object();

      private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
      
      public IAsyncEnumerable<int> GetMessagesAsync()
      {
        // TODO: use AsyncEx AsyncLock
        lock ( _mutex )
        {
          if ( _actualIncomingMessagesEnumerable == null)
            _actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
        }

        return _actualIncomingMessagesEnumerable;
      }

      private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
      {
        for ( var i = 0; i < _maxFakeMessages; i++ )
        {
          await Task.Delay( 100 );
          yield return i;
        }
      }
    }

    private class Consumer
    {
      private readonly int _id;
      private readonly IProducer _producer;
      public Consumer( int id,  IProducer producer )
      {
        _id = id;
        _producer = producer;
      }

      public async Task ConsumeMessagesAsync()
      {
        await foreach( var n in _producer.GetMessagesAsync() )
          Console.WriteLine( $"{_id}: Processing {n}" );
      }
    }
  }

Again, the output from this is what I would want as a user:

1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!

One benefit inherent to a pattern like this is that the consumer/caller can have their callback/item-of-type-T-handling-code occur within their own SynchronizationContext. Often the events off a SerialPort or Timer or other source can occur on a background thread and the user -- especially if on a UI thread -- may need to perform their own synchronization. In this case a consumer on the UI thread can always have their code happen on the UI thread while a user in a console app will have it happen on the threadpool.

Am I missing something?

Liam
  • 27,717
  • 28
  • 128
  • 190
KyleP
  • 334
  • 2
  • 12
  • 2
    This isn't a question that can be answered in the general case. If the enumerators are accessing shared state, then you need to understand how they'll affect each other and ensure proper synchronization. If there is no shared state and the specific sequence is designed to properly generate multiple independent enumerators, then it'll work fine. – Servy Oct 28 '20 at 14:38
  • @Servy I can refine the question with some help. I'm assuming the producer (creating the IEnumerableAsync) is a unique singleton instantiation available in an application and there will be one (and only one) unique invocation of the method returning the INumerableAsync. Imagine that method is a loop (until app shutdown/object teardown etc.) that handles messages coming from a connection to an outside service, and finally that this loop outlives a single disconnect/connect/error/etc. and is the only one needed for life of application. Thus, no other invocations to synchronize. – KyleP Oct 28 '20 at 15:24
  • 1
    It's the *specifics* that matter though. Without knowing the specific behavior you want those sequences to have, and *how you've actually implemented it* no one can tell you if you've achieved your desired semantics or not. Just knowing that you have a sequence backed by a connection to a service isn't enough for us to tell you if that will work or not. – Servy Oct 28 '20 at 15:43
  • 1
    As a side note the `GetMessagesAsync` is not exactly an asynchronous method, and the `Async` suffix is probably not suitable in this case. Personally I would name it `GetMessagesStream`. – Theodor Zoulias Oct 28 '20 at 16:14
  • @Servy I think I'm grasping now. I didn't know ```GetNumbersAsync()``` was actually getting called multiple times - once for each iteration. – KyleP Oct 28 '20 at 16:54
  • 1
    @KyleP It's only called once. It doesn't behave any differently (in this particular case) that if you called it multiple times, but you could write a method that *would* behave differently in those two cases. And of course the code defined in that method is performed multiple times, despite the method only being called once, which is a result of the transformations done by iterator blocks. – Servy Oct 28 '20 at 16:57

2 Answers2

2

Let's change slightly the implementation of the "event source" of your first example, the GetNumbersAsync method:

private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
    // This would really be async read operations from a remote source
    for (var i = 0; i < maxNumber; i++)
    {
        await Task.Delay(100);
        yield return Interlocked.Increment(ref _current);
    }
}

Here is the output after this change:

1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20

Each consumer is receiving different "events"!

Although the IAsyncEnumerable in your example is a single cached instance, every time you try to enumerate it with an await foreach statement a new IAsyncEnumerator is created, with its life bounded with this specific enumeration. The IAsyncEnumerators are neither thread-safe nor reusable, and if you try to cache one and share it between consumers, with each consumer calling its MoveNextAsync method without synchronization, you'll get undefined behavior.

If you want a source of IAsyncEnumerables that can be safely subscribed/unsubscribed at any time, and propagate all messages to subscribers that may consume them at different paces, it's nowhere near as trivial as caching an IAsyncEnumerable created by a C# iterator (a method containing yield statements). You can find implementations of an AsyncEnumerableSource here.

halfer
  • 19,824
  • 17
  • 99
  • 186
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • I was assuming that the loop was only iterating once and that upon the "yield return" statement, every user iterating with an ```await foreach``` would receive the same item. The AsyncEnumerableSource is indeed the concept I'm describing. Am I understanding correctly that even though I only call the method executing the loop (containing yield statements) one time, it's actually executed multiple times, perhaps concurrently (depends on circumstances,) by everyone who does an ```await foreach```? – KyleP Oct 28 '20 at 16:37
  • @KyleP yeap, exactly. Each `foreach` starts a new execution of the iterator. If you are familiar with the Reactive Extensions, you may find this behavior similar with the "cold" `IObservable`s created by the `Observable.Create` method. I am not sure if the terminology [hot and cold](http://introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html) applies to `IAsyncEnumerable`s though. I can't think of what a "hot `IAsyncEnumerable`" would look like. – Theodor Zoulias Oct 28 '20 at 16:59
  • 1
    @TheodorZoulias It would be one that performs the work when you call the method, rather than one that doesn't perform any work until after you get an enumerator and then ask that enumerator for an item. You'd need to use a non-iterator block implementation (at least in part) to accomplish that though. – Servy Oct 28 '20 at 17:21
  • @Servy hmm, maybe. But an `IAsyncEnumerable` has a time dimension. It doesn't just *"perform the work"*. Its work may last forever. For example would you say that an `IAsyncEnumerable` sequence that propagates mouse movements is a hot sequence? What if a consumer decides to enumerate this sequence using a timer, pulling one `Point` every minute? Retrieving mouse movements that occurred an hour ago doesn't sound very *hot* to me! – Theodor Zoulias Oct 28 '20 at 17:36
  • 1
    @TheodorZoulias By "perform work" I don't necessarily mean "causing side effects", I just mean "it's executing some code", even if that code is only observable via the results of the sequence. Using your example of a sequence that yields mouse movements, you could create a sequence that yields mouse movements every minute *starting from the time the IAsyncEnumerable is created*, which would be hot, or one that yields the movements every minute starting *when the first value is fetched*, which would be cold. – Servy Oct 28 '20 at 17:40
  • 1
    The distinction is purely whether it's doing something (in this case, inspecting the mouse position) when the enumerable is created, or when the first value is fetched. – Servy Oct 28 '20 at 17:40
  • @Servy my example with the mouse movements demonstrates that a consumer of a `IAsyncEnumerable` is allowed to lag regarding its tempo of consumption. This is not possible with an `IObservable`, because this one actively pushes the messages to its observers. It doesn't just store them and wait for its subscribers to pull them at their own pace like an `IAsyncEnumerable`. I am not sure if this difference is important enough to make the "hot and cold" concept incompatible with `IAsyncEnumerable`s though. – Theodor Zoulias Oct 28 '20 at 17:50
  • @Servy @Theodor I was initially surprised to see when I adjusted the first example to add a ```Task.Delay( 5000 )``` after calling the IAsyncEnumerable function, that the consumers saw the products of first loop. I was thinking that the function was hot / running already but this is apparently is my misunderstanding ```yield```? I thought "late joiners" would not see previous "events" since they had already been iterated. Does the code in the called method execute "hot" (without someone calling ```foreach await```) up until the first await and/or up until the first yield statement? – KyleP Oct 28 '20 at 19:29
  • @KyleP the enumeration is driving the iteration. Every time the `MoveNextAsync` of the enumerator is called, the iterator proceeds to the next `yield`. If you don't call the `MoveNextAsync`, nothing happens. The iterator has no life of its own. Btw the `MoveNextAsync` is called implicitly by the `await foreach` loop. You can call it explicitly if you want, by getting an enumerator with `enumerable.GetAsyncEnumerator()` and enumerating it manually. It results to the same thing. – Theodor Zoulias Oct 28 '20 at 19:48
0

Looks like channels is what you're looking for.

An Introduction to System.Threading.Channels

Working with Channels in .NET

Paulo Morgado
  • 14,111
  • 3
  • 31
  • 59
  • According to [this answer](https://stackoverflow.com/a/61960889/777985) and my understanding, an element passed into a `Channel` is received by only one consumer if there are multiple, which does not seem to be what the OP wants; passing an element to multiple consumers once available. – Ray Apr 27 '22 at 22:37