6

I'm trying to wrap an asynchronous subscription API based on events with an API based on IAsyncEnumerable. Basically along the lines of:

async IAsyncEnumerable<string> ReadAll() 
{
    var reader = new EventBasedReader();
    reader.OnRead => (_, args) => yield return e.Message;
    reader.Start();
    await reader.WaitUntilAllRead();
}

However this doesn't work because it's the event handler that yields, and this isn't allowed. Is there another way I can write this to make it work as an IAsyncEnumerable?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Barguast
  • 5,926
  • 9
  • 43
  • 73
  • 1
    You may find this interesting: [Factory for IAsyncEnumerable or IAsyncEnumerator](https://stackoverflow.com/questions/61540896/factory-for-iasyncenumerable-or-iasyncenumerator). Also you could consider adding the [iasyncenumerable](https://stackoverflow.com/questions/tagged/iasyncenumerable) tag to the question. – Theodor Zoulias Jun 05 '20 at 12:09
  • I faced a similar problem and have just [blogged about it](https://dev.to/noseratio/c-events-as-asynchronous-streams-with-reactivex-or-channels-82k). – noseratio Jul 17 '20 at 12:12

1 Answers1

14

Wrap an asynchronous subscription API based on events with an API based on IAsyncEnumerable.

Those two are not directly compatible. Events are push-based, and enumerables (including async enumerables) are pull-based.

In order to cross that divide, you need a buffer - some place to hold the event data as it is pushed to you but before the downstream code has pulled it.

I recommend using Channels for buffers. If your use case allows it, you could use an unbounded channel:

IAsyncEnumerable<string> ReadAll() 
{
  var reader = new EventBasedReader();
  var buffer = Channel.CreateUnbounded<string>();
  reader.OnRead = async (_, args) => await buffer.Writer.WriteAsync(e.Message);
  reader.Start();
  CompleteBufferWhenEventsAreDone();
  return buffer.Reader.ReadAllAsync();

  async void CompleteBufferWhenEventsAreDone()
  {
    await reader.WaitUntilAllRead();
    buffer.Writer.TryComplete();
  }
}
David Pine
  • 23,787
  • 10
  • 79
  • 107
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • hi,which is better? thanks. public interface IPriceService { public Task StartAsync(TConfig configuration, CancellationToken cancellationToken); public Task StopAsync(CancellationToken cancellationToken); public event EventHandler PriceUpdated; } public interface IPriceService { public Task StartAsync(TConfig configuration, CancellationToken cancellationToken); public Task StopAsync(CancellationToken cancellationToken); public IAsyncEnumerable GetPricesAsync(CancellationToken cancellationToken); } – Timeless Mar 28 '23 at 12:14
  • It's hard to read code in comments; could you ask your own question? – Stephen Cleary Mar 28 '23 at 12:34