17

I have an async iterator method that produces an IAsyncEnumerable<int> (a stream of numbers), one number every 200 msec. The caller of this method consumes the stream, but wants to stop the enumeration after 1000 msec. So a CancellationTokenSource is used, and the token is passed as an argument to the WithCancellation extension method. But the token is not respected. The enumeration continues until all the numbers are consumed:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Output:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

The expected output is a TaskCanceledException to occur after number 5. It seems that I have misunderstood what the WithCancellation is actually doing. The method just passes the supplied token to the iterator method, if that method accepts one. Otherwise, like with the method GetSequence() in my example, the token is ignored. I suppose that the solution in my case is to interrogate manually the token inside the body of the enumeration:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

This is simple and works well. But in any case I wonder if it would be possible to create an extension method that does what I expected the WithCancellation to do, to bake the token inside the ensuing enumeration. This is the signature of the needed method:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • If the code is not written in a manner that allows it to abort early, you cannot force it to abort early. Well, you *can*, but you **really shouldn't**. – Lasse V. Karlsen Oct 04 '19 at 13:24
  • @LasseVågsætherKarlsen this is like saying that you shouldn't break from a loop early. This is a very strong claim to make! – Theodor Zoulias Oct 04 '19 at 14:35
  • 1
    The situations are not analogous -- breaking from a synchronous loop is always safe, but "cancelling" asynchronous enumeration only between iterations means that we may be adding considerable overhead and delays (not an issue for `Task.Delay`, but definitely an issue for real work). The situation is not as dire as for general async work (where we may have to accept work has not been cancelled at all and is still going on in the background, albeit ignored), since async enumeration implicitly includes disposing resources, but still not optimal. Compare running this with `Task.Delay(10000)`. – Jeroen Mostert Oct 04 '19 at 14:49
  • 1
    @JeroenMostert breaking from synchronous loops is safe because the compiler-generated iterators [are disposing properly all disposable resources](https://stackoverflow.com/questions/1405877/a-concern-about-yield-return-and-breaking-from-a-foreach), and the same is true for compiler-generated async iterators too. When you break inside an `await foreach` means that you are breaking after the completion of the previous [MoveNextAsync](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerator-1.movenextasync), when nothing special is going on. – Theodor Zoulias Oct 06 '19 at 10:00
  • @JeroenMostert regarding the case of ignored background work, I have made a relevant question [here](https://stackoverflow.com/questions/58216380/how-to-stop-propagating-an-asynchronous-stream-iasyncenumerable). The feedback I got is that I should transfer the responsibility to the caller to provide an extra cancellation notification, in addition to breaking the loop. – Theodor Zoulias Oct 06 '19 at 10:00

3 Answers3

31

IAsyncEnumerable explicitly provides for this mechanism with the EnumeratorCancellation attribute:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

In fact, the compiler is helpful enough to issue a warning if you give the method a CancellationToken parameter, but do not add the attribute.

Note that the token passed to .WithCancellation will override any local token passed to the method. The specs have the details on this.

Of course, this will still only work if the enumeration actually accepts a CancellationToken -- but the fact that cancellation only really works if done cooperatively is true of any async work. Yeldar's answer is good for "forcing" some measure of cancellation into an enumerable that doesn't support it, but the preferred solution should be to modify the enumeration to support cancellation by itself -- the compiler does everything to help you out.

Jeroen Mostert
  • 27,176
  • 2
  • 52
  • 85
  • Thanks Jeroen for the answer! The information you provide is important, although it was already known to me. My question is about the case of non-cooperative cancellation, basically breaking from a loop that consumes the async enumerable, but it's important that you pointed to the distinction between the two modes. – Theodor Zoulias Oct 04 '19 at 14:31
  • Btw according to my tests the information provided in the [proposal specs](https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-8.0/async-streams) in inaccurate. If two tokens are passed, one directly and one through `WithCancellation`, both will be respected. Maybe they changed their mind about this in the meanwhile. – Theodor Zoulias Oct 04 '19 at 14:32
  • As async streams are new in C# 8, I find it hard to come up with a scenario where you'd have an enumerable that you couldn't write to support cooperative cancellation -- there's no legacy code yet that we have to fix! You could conceivably have an async enumeration that's wrapping an existing sync enumeration (as bad of an idea as that is) but even then you could (and arguably should) insert the cancellation logic there. – Jeroen Mostert Oct 04 '19 at 14:33
  • From my understanding the cancellation is useful to allow cancelling the awaiting between two loops. So in you example you should probably uncomment the `Task.Delay(200, ct)` code, because this is where the awaiting happens. In general I agree that every `IEnumerable` that deserves to become `IAsyncEnumerable`, it deserves to support cancellation too. :-) – Theodor Zoulias Oct 04 '19 at 14:46
  • 1
    Yes, if `Task.Delay` represented real (cancellable) work, we would simply pass the token and we wouldn't need to call `ThrowIfCancellationRequested` at all (assuming the rest of the loop did nothing interesting either). – Jeroen Mostert Oct 04 '19 at 14:49
  • Yeap. But we may be unlucky and the real work may not be cancelable. We may have to call a legacy API for example. Then it would be debatable if there is any value at accepting a cancellation token that doesn't actually cancel the real job, but only cancels the awaiting while the real job continues running. – Theodor Zoulias Oct 04 '19 at 14:58
  • It is debatable whether to even have an `AsyncEnumerable` at all in that case, but if you must have one like that, I would use *neither* `WithCancellation` nor the proposed `WithForcedCancellation`, and I *would* use your original approach of pulling the check into the outer code. That's the only way that makes it clear that we're pretending to have asynchronous work, when we really don't (which is really good to know). Note that asynchronous code can consume plain old synchronous `Enumerable`s just fine, with cancellation in the iterations -- you could make an extension method for *that*. – Jeroen Mostert Oct 04 '19 at 15:04
  • When I talked about legacy API I was thinking about async methods that don't support cancellation. This is the new legacy now. The sync methods are the legacy of the legacy. :-) – Theodor Zoulias Oct 04 '19 at 15:16
  • Fair enough -- but note that the approach in this answer works perfectly fine for that (i.e. it's using a `Task.Delay` that can't be cancelled to represent that work). This is a lot cleaner than having an extension method on every `IAsyncEnumerable`, as that already presumes we have `IAsyncEnumerable`s that are written "wrong"... which is unnecessarily pessimistic, and also depressing. :-P – Jeroen Mostert Oct 04 '19 at 15:23
  • Ha ha! First world problems. Feeling depressed after handled an enumerable that lacks cancellation. :-P Seriously though, the extension method `WithCancellation` is pretty easy to be misunderstood initially. I expect that lots of developers are going to misuse it, having made incorrect assumptions about what this method is actually intended to do. Noticing immediately bellow in the IntelliSense the `WithEnforcedCancellation` method could prevent them, maybe, from making these incorrect assumptions. – Theodor Zoulias Oct 04 '19 at 16:18
  • Beware: This statement, taken from the draft specs, [turns out to be incorrect](https://dotnetfiddle.net/vPDC9H): "Note that the token passed to `.WithCancellation` will override any local token passed to the method." In actuality, _the cancellation tokens are combined_, with cancellation of either being honored, as the above fiddle demonstrates. I have been unable to find documentation of this fact. – Timo Feb 28 '23 at 16:06
4

You can just extract your logic into an extension method like this:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    {
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    }
}
Yeldar Kurmangaliyev
  • 33,467
  • 12
  • 59
  • 101
2

I think it's important to reiterate that you're not supposed to do this. It's always better to make the async method support cancellation tokens, then cancellation is immediate as you would expect. If that's impossible, I still recommend trying one of the other answers before trying this one.

With that said, if you can't add cancellation support to the async method, and you absolutely do need immediate termination of the foreach, then you can hack your way around it.

One trick is to use Task.WhenAny with two arguments:

  1. the task you get from IAsyncEnumerator.MoveNextAsync()
  2. another task that does support cancellation

Here's the short version

// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
    yield return enumerator.Current;
}

Long version with ConfigureAwait(false) and DisposeAsync() for completeness, should work if you run it locally.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper
{
    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax
        // because we need access to the ValueTask returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        Task<bool> moveNext = null;

        // Combine MoveNextAsync() with another Task that can be awaited indefinitely,
        // until it throws OperationCanceledException
        var untilCanceled = UntilCanceled(cancellationToken);
        try
        {
            while (
                await (
                    await Task.WhenAny(
                        (
                            moveNext = enumerator.MoveNextAsync().AsTask()
                        ),
                        untilCanceled
                    ).ConfigureAwait(false)
                )
            )
            {
                yield return enumerator.Current;
            }
        }
        finally
        {
            if (moveNext != null && !moveNext.IsCompleted)
            {
                // Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!

                moveNext.ContinueWith(async _ =>
                {
                    await enumerator.DisposeAsync();
                }, TaskScheduler.Default);
#pragma warning restore 4014
            }
            else if (enumerator != null)
            {
                await enumerator.DisposeAsync();
            }
        }
    }

    private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
    {
        // This is just one possible implementation... feel free to swap out for something else
        return new Task<bool>(() => true, cancellationToken);
    }
}

public class Program
{
    public static async Task Main()
    {
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            {
                Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
        }
    }

    static async IAsyncEnumerable<int> GetSequence()
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(200);
            yield return i;
        }
    }
}

Caveats

The enumerator returns a ValueTask for improved performance (uses fewer allocations than regular Tasks), but a ValueTask cannot be used with Task.WhenAny(), so AsTask() is used which degrades performance by introducing allocation overhead.

The enumerator can only be disposed if the most recent MoveNextAsync() is completed. It's more likely that the Task is still running when cancellation is requested. That's why I added another call to DisposeAsync in a continuation task.

In this scenario, the enumerator is not yet disposed when the WithEnforcedCancellation() method exits. It will be disposed some indeterminate amount of time after the enumeration is abandoned. If DisposeAsync() throws an exception , the exception will be lost. It cannot bubble up the call stack, because there is no call stack.

Steven Liekens
  • 13,266
  • 8
  • 59
  • 85
  • Thanks Steven for the answer. AFAICS your `WithEnforcedCancellation` implementation works perfectly. Two minor improvements that I could suggest is to dispose the enumerator before exiting the method, and configure the `ContinueWith` with the `TaskScheduler.Default`, to avoid [the problems](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) associated with relying on the ambient `TaskScheduler.Current`. – Theodor Zoulias Sep 02 '21 at 01:22
  • Also an alternative way to create the `Task cancellationRequested` is this: `new Task(() => true, cancellationToken)`. This will never return `true`, because the `Task` is not started. It may only complete as `Canceled`. This makes the `cancellationToken.ThrowIfCancellationRequested()` check inside the loop redundant. – Theodor Zoulias Sep 02 '21 at 01:29
  • Also configuring the `Task.WhenAny` with `ConfigureAwait(false)` might be desirable (although [this is debatable](https://dev.to/noseratio/why-i-no-longer-use-configureawait-false-3pne)). – Theodor Zoulias Sep 02 '21 at 01:39
  • 1
    I updated my answer with your improvements, except for disposing the enumerator which throws `NotSupportedException` for some reason. I don't really understand why. – Steven Liekens Sep 02 '21 at 10:05
  • Ah, yes. Probably it cannot be disposed at this moment because there is a pending `MoveNextAsync` operation in-flight. That's the consequence for doing things that we shouldn't really be doing. – Theodor Zoulias Sep 02 '21 at 10:16
  • 1
    ^yes, this, see: https://github.com/dotnet/runtime/issues/51176#issuecomment-818866190 – Steven Liekens Sep 02 '21 at 10:16
  • 1
    I added the call to `DisposeAsync` in a continuation task for the most recent pending `MoveNextAsync` task, I also added a new note to the caveats. – Steven Liekens Sep 02 '21 at 10:44
  • 1
    Now the implementation seems to be perfect in all aspects. Thanks Steven! – Theodor Zoulias Sep 02 '21 at 10:49
  • 1
    I found one more improvement, the enumerator can be disposed synchronously when the most recent task is already completed. I also added a note about exception handling (or lack thereof). – Steven Liekens Sep 13 '21 at 12:12
  • 1
    That's a nice final touch! Btw not only a possible exception thrown by the `DisposeAsync` will be lost, but also by the last incomplete `moveNext` task. No one will ever know what happened to them! – Theodor Zoulias Sep 13 '21 at 14:36