3

I working with AsyncEnumerables to read a bunch of files into my C# application (.NET 6) and want to add an option to cancel reading. However I'm confused by the behaviour of ToArrayAsync when provided with a cancellationToken.

I tried to break it down to this snippet:

var enumerable = Enumerable.Range(1, 10)
    .ToAsyncEnumerable()
    .SelectAwait(async x => { await Task.Delay(1000); return x; });

var cts = new CancellationTokenSource();
var resultTask = enumerable.ToArrayAsync(cts.Token);

cts.Cancel();
var z = await resultTask;

From my understanding SelectAwait and ToArrayAsync should evaluate one entry after another (which is supported by timing it). I'd expect ToArrayAsync to check the provided token after everyone of those steps. I tried looking up the source for it and think it boils down to this bit, seemingly also supporting my assumption:

public ValueTask<bool> MoveNextAsync()
{
    _cancellationToken.ThrowIfCancellationRequested();
    return _source.MoveNextAsync(); // REVIEW: Signal cancellation through task or synchronously?
}

However in practice ToArrayAsync fully evaluates all entries of the enumerable (taking the full 10 seconds) and returns without a cancellation-exception. Calling ToArrayAsync with an already cancelled token immediately throws, so presumably it only checks at the very start of its execution?

I know I can work around that by using SelectAwaitWithCancellation and checking to token inside its lambda, but I'm unsure about my understanding of ToArrayAsync and AsyncEnumerables in general, so I'd be happy about clarification about how it works and what's the design thinking behind it.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Simoris
  • 51
  • 4

1 Answers1

2

This behavior stems from how the ToAsyncEnumerable operator is implemented in the System.Linq.Async package. This operator checks the CancellationToken passed to the GetAsyncEnumerator method only once. It's not rechecked in each MoveNextAsync operation. Here is a minimal demonstration of this behavior:

using CancellationTokenSource cts = new();
IAsyncEnumerable<int> sequence = Enumerable.Range(1, 10).ToAsyncEnumerable();
IAsyncEnumerator<int> enumerator = sequence.GetAsyncEnumerator(cts.Token);
cts.Cancel();
bool moved = await enumerator.MoveNextAsync();
Console.WriteLine($"moved: {moved}");

Output:

moved: True

Online demo.

In case you want a ToAsyncEnumerable operator that observes properly the cancellation token, here is one:

#pragma warning disable 1998
public static async IAsyncEnumerable<T> ToAsyncEnumerable2<T>(
#pragma warning restore 1998
    this IEnumerable<T> source,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    cancellationToken.ThrowIfCancellationRequested();
    using IEnumerator<T> enumerator = source.GetEnumerator();
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();
        if (!enumerator.MoveNext()) break;
        yield return enumerator.Current;
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Was this not actually fixed with [#1158](https://github.com/dotnet/reactive/pull/1158)? – Kevin Krumwiede Mar 07 '23 at 16:59
  • @KevinKrumwiede AFAICS [that](https://github.com/dotnet/reactive/pull/1158 "Cancel in-flight TaskCompletionSources in ToAsyncEnumerable(Observable).") pull request was a fix for something related with the `IObservable` to `IAsyncEnumerable` conversion. In this question the OP is dealing with a different conversion. – Theodor Zoulias Mar 07 '23 at 17:19