0

I want to expose reading and parsing data from a file asynchronously into strings, through an IAsyncEnumerable<string>, so my calling code can process the data as it comes in.

The problem is that I have to pass a disposable (here modeled by a StreamReader) to the parser, and while it must be disposed after use the parser can't do that.

var parser = new Parser();
var wrapper = new Wrapper(parser);

// My calling code, fake reading a file and then process it:
using var stream = new MemoryStream("Multi\nLine\nString"u8.ToArray());

await foreach (var s in wrapper.ReadStringsAsync(stream))
{
    Console.WriteLine(s);
}

// The "wrapper" class, to hide that I'm using a StreamReader, and dispose it after we're done:
public class Wrapper
{
    private readonly Parser _parser;

    public Wrapper(Parser parser) => _parser = parser;

    public IAsyncEnumerable<string> ReadStringsAsync(Stream stream)
    {
        using var reader = new StreamReader(stream);

        if (reader.EndOfStream)
        {
            throw new InvalidOperationException("Can't read an empty file");
        }

        return _parser.ReadStringsAsync(reader);
    }
}

// The "parser" calling the StreamReader's ReadLineAsync()
public class Parser
{
    public async IAsyncEnumerable<string> ReadStringsAsync(StreamReader reader)
    {
        while (true)
        {
            var line = await reader.ReadLineAsync();
            if (line == null)
            {
                yield break;
            }

            yield return line;
        }
    }
}

The whole point of the Wrapper is to hide from the caller that a StreamReader is being used, and the whole point of the Parser class is that it can only call ReadLineAsync() and cannot dispose the reader.

For all intents and purposes, it's not really a StreamReader, it's not really a direct dependency (more like reader.GetAnotherDisposable()); this is a contrived example, the real-life scenario does require double disposing.

The wrapper code using var reader disposes the StreamReader even before the first line can be read:

System.ObjectDisposedException: Cannot read from a closed TextReader.

How can I refactor the non-async IAsyncEnumerable<string> Wrapper.ReadStringsAsync() method, so that:

  1. It can be called using await foreach().
  2. It hides the use of the StreamReader from the caller.
  3. It still owns the StreamReader, because it needs to call methods on it before returning the IAsyncEnumerable, and it must be disposed afterwards.
  4. It won't be made async, because it can't yield return.
  5. It only disposes the StreamReader after all its contents have been enumerated.

While writing this question I found two solutions:

Option 1, regarding point 4 above: Return IAsyncEnumerable from an async method

Make it async indeed, have it enumerate the results asynchronously and yield return them again:

public async IAsyncEnumerable<string> ReadStringsAsync(Stream stream)
{
    using var reader = new StreamReader(stream);

    if (reader.EndOfStream)
    {
        throw new InvalidOperationException("Can't read an empty file");
    }

    await foreach (var s in _parser.ReadStringsAsync(reader))
    {
        yield return s;
    }
}

But that feels like asyncception. Doesn't this compile into an extraneous state machine and enumerator that do nothing but proxy the actual work (i.e. copy the string values straight into another async state machine) and add overhead?


Option 2, regarding point 5 above: Correct disposal in IAsyncEnumerable methods?

Return the disposable from the non-async by making it an out variable:

public IAsyncEnumerable<string> ReadStringsDisposable(Stream stream, out IDisposable disposeMe)
{
    var reader = new StreamReader(stream);

    disposeMe = reader;

    if (reader.EndOfStream)
    {
        throw new InvalidOperationException("Can't read an empty file");
    }

    return _parser.ReadStringsAsync(reader);
}

Then dispose it after use:

IDisposable? disposeMe = null;

try
{
    await foreach (var s in wrapper.ReadStringsAsync(stream, out disposeMe))
    {
        Console.WriteLine(s);
    }
}
finally
{
    disposeMe?.Dispose();
}

But that changes the signature, which I'd rather not do, because it prevents refactoring the implementation later.

An alternative to the latter would be to create a custom IAsyncEnumerable/tor implementation that does the disposing on disposal of the enumerator.

So how can I dispose the class without creating a "useless" state machine (making it async, calling await foreach() yield return) and without modifying the signature into an unrefactorable one?

CodeCaster
  • 147,647
  • 23
  • 218
  • 272
  • What is the problem of **Option 1**? Yes it will create a state machine for this async method - but is this performance impact really that bad? You can use `ConfigureAwait(false)` to lower the amount. But the code is readable and it's clear what it does. – Sebastian Schumann Mar 28 '23 at 11:22
  • Just don't dispose the StreamReader? All it does is closing the stream in your example. Actually as you dispose the MemoryStream also you do it twice here. – Ralf Mar 28 '23 at 11:25
  • @Ralf I address that twice: _" This is a contrived example, the real-life scenario does require double disposing"_ and _"for all intents and purposes, it's not really a StreamReader and it must be disposed after use"_. – CodeCaster Mar 28 '23 at 11:31
  • Clarification: the `UsingDependency.ReadStringsAsync` method is a third-party API that is outside of your control, that you can't modify, correct? – Theodor Zoulias Mar 28 '23 at 11:45
  • @Sebastian I suppose the `async ... await foreach() { yield return }` seems ... counter-intuitive, just like `async Task Foo() => await Bar()`. See Stephen Cleary's link about [eliding](https://blog.stephencleary.com/2016/12/eliding-async-await.html). – CodeCaster Mar 28 '23 at 11:46
  • @Theodor I can alter both the "wrapper" and the code "using the dependency", it's just that the method `Wrapper.ReadStringsAsync()` **must** be the one creating the `StreamReader` (representing the actual third-party dependency), and `UsingDepedency.ReadStringsAsync()` **must** call `StreamReader.ReadLineAsync()`. – CodeCaster Mar 28 '23 at 11:46
  • I am confused. What part of the example represents the third-party library? – Theodor Zoulias Mar 28 '23 at 11:49
  • 1
    @Theodor updated my comment. The `StreamReader` (implementing `IDisposable`) and `ReadLineAsync()` represent the code I must call. – CodeCaster Mar 28 '23 at 11:50
  • Why is that counter-intuitive? If you have a recursive IEnumerable-returning method you'll return some stuff, foreach over the recursive call and yield each result. That's the exact same thing - imo. – Sebastian Schumann Mar 28 '23 at 11:59
  • @SebastianSchumann it seems counter-intuitive that my `Wrapper.ReadStringsAsync()` receives an `IAsyncEnumerable` from its dependency and can't _return_ that, but has to _enumerate_ the results from it and wrap those in a new one with `async ... yield return`. – CodeCaster Mar 28 '23 at 12:07

3 Answers3

2

Option 1 is the best, IMO.

Using async/await is the natural approach, and the early disposal issue is a common problem if the code elides async/await too eagerly. The problem is the same; it's just applied to async sequences instead of tasklikes.

Doesn't this compile into an extraneous state machine and enumerator that do nothing but proxy the actual work and add overhead?

No; it compiles into a state machine that keeps track of the enumerator state and disposes your resource when the enumerator is disposed. ;) In other words, you're having the compiler write the "alternative" option for you.

If this was just an await, it wouldn't seem weird. The weirdness is because of the foreach+yield. But it's a parallel case; the same reasoning applies, and the foreach+yield is appropriate.

Other languages (e.g., Python) do have a yield* operator that is essentially foreach+yield (and their asynchronous counterparts), but C# does not (for neither asynchronous nor synchronous code).

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    _"In other words, you're having the compiler write the "alternative" option for you."_ - that does sound optimal. – CodeCaster Mar 28 '23 at 11:43
  • I've looked at the IL (which used to be more straightforward before async/await, lol), but there's a massive difference between _returning the `IAsyncEnumerable`_ and _enumerating it and returning it in another `async ... await foreach () ... yield return`_. Isn't there a whole lot of copying involved in the latter? A BenchmarkDotNet of both approaches showed 3.1 vs 2.7 microseconds per invocation, so it seems negligible, but still... – CodeCaster Mar 28 '23 at 14:29
  • @CodeCaster: it is more expensive to iterate a sequence than to return it, yes. All copying is just shallow, though. So, as you found, the difference is minimal. – Stephen Cleary Mar 28 '23 at 15:01
  • See [my answer](https://stackoverflow.com/a/75868094), I _had_ to try and build it. I'll just go with the more idiomatic approach outlined by you, though, thanks. – CodeCaster Mar 28 '23 at 15:19
0

I would suggest supporting async cancellation, and disposing the reader in a finally:

using System.Runtime.CompilerServices;

public class UsingDependency
{
    static async Task Main()
    {
        var source = new StringReader(@"some
data
here
we
won't
read
it
all");

        var obj = new UsingDependency();
        var cancel = new CancellationTokenSource();
        int count = 0;
        try
        {
            await foreach (var item in obj.ReadStringsAsync(source).WithCancellation(cancel.Token))
            {
                Console.WriteLine("got: " + item);
                if (++count == 3)
                {
                    cancel.Cancel();
                    // break; // alternative: *only* works cleanly if we exit
                              // *between* async read operations
                }
            }
        }
        catch (OperationCanceledException oce) when (oce.CancellationToken == cancel.Token)
        {
            Console.WriteLine("detected self-cancellation");
        }
    }
    public IAsyncEnumerable<string> ReadStringsAsync(TextReader reader)
    {
        return Impl(reader, default);
        static async IAsyncEnumerable<string> Impl(TextReader reader, [EnumeratorCancellation] CancellationToken cancellation)
        {
            try {
                while (true) // or possibly while (!cancellation.IsCancellationRequested), which exits more cleanly
                // *if* cancellation happens *between* async reads
                {
                    var line = await reader.ReadLineAsync(cancellation);
                    if (line == null)
                    {
                        yield break;
                    }

                    yield return line;
                }
            }
            // uncomment if you want to exit without throwing on cancellation during ReadLineAsync
            //catch (OperationCanceledException oce) when (oce.CancellationToken == cancellation)
            //{ }  
            finally
            {
                Console.WriteLine("We disposed the reader, yay!");
                reader.Dispose();
            }
        }
    }
}

The main point being:

  • by disposing in the finally of ReadStringsAsync, we ensure it is located appropriately
  • by supporting cancellation we allow elective short-circuiting of the enumerator including when an async operation is in progress
  • note that we don't need cancellation if you are just exiting the await foreach between reads; the finally will still get invoked (via the Dispose() on the enumerator); you can see this by replacing cancel.Cancel(); with break;
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • Thanks, but the premise is that the calling code cannot know about nor instantiate the reader, that is an implementation detail. I only have a `Stream` and must pass that to a method `IAsyncEnumerable ReadStringsAsync(Stream)` (dictated by an interface), while that method instantiates something that requires disposing. While cancellation is something I do want to support, I don't see yet how I can solve the original problem without moving the reader creation using this approach. – CodeCaster Mar 28 '23 at 11:39
0

For completeness's sake, I have implemented an IAsyncEnumerable<T> adapter whose constructor accepts a source enumerable and an action to perform during disposal:

public record DisposingAsyncEnumerable<T>(IAsyncEnumerable<T> Source, Action OnDisposing) 
    : IAsyncEnumerable<T>
{
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) 
        => new DisposingAsyncEnumerator(
            Source.GetAsyncEnumerator(cancellationToken), 
            OnDisposing
        );

    private record DisposingAsyncEnumerator(IAsyncEnumerator<T> Source, Action OnDisposing)
        : IAsyncEnumerator<T>
    {
        public T Current => Source.Current;
     
        public ValueTask<bool> MoveNextAsync() => Source.MoveNextAsync();

        public async ValueTask DisposeAsync()
        {
            await Source.DisposeAsync();

            OnDisposing();

            GC.SuppressFinalize(this);
        }
    }
}

This allows me to remain non-async, keep the signature the same, and ultimately, dispose after use with a less clunky syntax:

public IAsyncEnumerable<string> ReadStringsAsyncAdapter(Stream stream)
{
    var reader = new StreamReader(stream);

    if (reader.EndOfStream)
    {
        throw new InvalidOperationException("Can't read an empty file");
    }

    var enumerable = _parser.ReadStringsAsync(reader);

    // Actually just `reader.Dispose`, but () => to show the Action
    return new DisposingAsyncEnumerable<string>(enumerable, () => reader.Dispose());
}

Benchmarks:

BenchmarkDotNet=v0.13.5, OS=Windows 11 (10.0.22621.1413/22H2/2022Update/SunValley2) 12th Gen Intel Core i7-12700T, 1 CPU, 20 logical and 12 physical cores

.NET SDK=7.0.202

[Host] : .NET 7.0.4 (7.0.423.11508), X64 RyuJIT AVX2

DefaultJob : .NET 7.0.4 (7.0.423.11508), X64 RyuJIT AVX2

Method Mean Error StdDev
EnumerateAsyncAwait ("Option 1") 3.073 us 0.0605 us 0.0808 us
EnumerateAsyncDispose ("Option 2") 2.781 us 0.0554 us 0.0985 us
EnumerateAsyncAdapter (this answer) 2.781 us 0.0538 us 0.0718 us

We're in microsecond territory here, so I don't know what this proves, and I think I'll still go with the async ... await foreach() yield return, however weird that looks and despite the ~10% penalty. The actual reading of the stream will take longer than copying a few string pointers around.

CodeCaster
  • 147,647
  • 23
  • 218
  • 272