I am examining the effects of replacing some instances of regular C# event pattern with IAsyncEnumerable. This would be accomplished by lazy instantiation/activation of an IAsyncEnumerable and caching that reference for use by all callers/listeners. Some quick tests (see below) show that this works, but I haven't seen other examples online using IAsyncEnumerable in this fashion.
I realize this isn't exactly what IAsyncEnumerable was created for, and that most would advocate for ReactiveX (https://github.com/dotnet/reactive) in this case. However, I'd appreciate an analysis of why one would or would not want to do this as described (instead of just how to do this with Rx). I've provided a couple of examples below. My candidate event pattern replacement is one where it is more of an event stream (like deserialized messages being produced off a serial connection or UDP socket etc.)
Example 1:
class Program
{
public static async Task Main( string[] args )
{
// Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
var asyncEnumerable = GetNumbersAsync( 10 );
// Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );
Console.WriteLine( "DONE!");
}
private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
{
await foreach ( var n in numbers )
Console.WriteLine( $"{id}: Processing {n}" );
}
private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
{
// This would really be async read operations from a remote source
for ( var i = 0; i < maxNumber; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
This produces the output I would want as a user of this pattern:
1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!
The previous example is putting each consumer on a different thread but based on the context (perhaps WPF app) there could be multiple consumers on the same thread (not possible with IEnumerable but the door is opened with IAsyncEnumerable). The following is in a console app but one can imagine the producers and consumers being created on the UI thread of a WPF app.
Example 2:
class Program
{
public static async Task Main( string[] args )
{
var producer = new Producer();
var consumer1 = new Consumer( 1, producer );
var consumer2 = new Consumer( 2, producer );
var consumer3 = new Consumer( 3, producer );
await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );
Console.WriteLine( "DONE!");
}
// Singleton producer
private interface IProducer
{
IAsyncEnumerable<int> GetMessagesAsync();
}
// Transient consumer
private interface IConsumer
{
Task ConsumeMessagesAsync();
}
private class Producer : IProducer
{
private const int _maxFakeMessages = 10;
private readonly object _mutex = new Object();
private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
public IAsyncEnumerable<int> GetMessagesAsync()
{
// TODO: use AsyncEx AsyncLock
lock ( _mutex )
{
if ( _actualIncomingMessagesEnumerable == null)
_actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
}
return _actualIncomingMessagesEnumerable;
}
private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
{
for ( var i = 0; i < _maxFakeMessages; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
private class Consumer
{
private readonly int _id;
private readonly IProducer _producer;
public Consumer( int id, IProducer producer )
{
_id = id;
_producer = producer;
}
public async Task ConsumeMessagesAsync()
{
await foreach( var n in _producer.GetMessagesAsync() )
Console.WriteLine( $"{_id}: Processing {n}" );
}
}
}
Again, the output from this is what I would want as a user:
1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!
One benefit inherent to a pattern like this is that the consumer/caller can have their callback/item-of-type-T-handling-code occur within their own SynchronizationContext. Often the events off a SerialPort or Timer or other source can occur on a background thread and the user -- especially if on a UI thread -- may need to perform their own synchronization. In this case a consumer on the UI thread can always have their code happen on the UI thread while a user in a console app will have it happen on the threadpool.
Am I missing something?